Compare commits
2 Commits
copilot/fi
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c06760cf15 | ||
|
|
c684456eba |
@@ -1,13 +0,0 @@
|
||||
name: validate_pr_author_email
|
||||
|
||||
on:
|
||||
pull_request_target:
|
||||
types:
|
||||
- opened
|
||||
- synchronize
|
||||
- reopened
|
||||
|
||||
jobs:
|
||||
validate_pr_author_email:
|
||||
uses: scylladb/github-automation/.github/workflows/validate_pr_author_email.yml@main
|
||||
|
||||
@@ -547,13 +547,17 @@ void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_build
|
||||
vp.insert(b.second);
|
||||
}
|
||||
}
|
||||
std::vector<sstring> res;
|
||||
replica::database& db = vb.local().get_db();
|
||||
auto uuid = validate_table(db, ks, cf_name);
|
||||
replica::column_family& cf = db.find_column_family(uuid);
|
||||
co_return cf.get_index_manager().list_indexes()
|
||||
| std::views::transform([] (const auto& i) { return i.metadata().name(); })
|
||||
| std::views::filter([&vp] (const auto& n) { return vp.contains(secondary_index::index_table_name(n)); })
|
||||
| std::ranges::to<std::vector>();
|
||||
res.reserve(cf.get_index_manager().list_indexes().size());
|
||||
for (auto&& i : cf.get_index_manager().list_indexes()) {
|
||||
if (vp.contains(secondary_index::index_table_name(i.metadata().name()))) {
|
||||
res.emplace_back(i.metadata().name());
|
||||
}
|
||||
}
|
||||
co_return res;
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
#include <seastar/core/condition-variable.hh>
|
||||
|
||||
#include "schema/schema_fwd.hh"
|
||||
#include "sstables/open_info.hh"
|
||||
#include "compaction_descriptor.hh"
|
||||
|
||||
class reader_permit;
|
||||
@@ -45,7 +44,7 @@ public:
|
||||
virtual compaction_strategy_state& get_compaction_strategy_state() noexcept = 0;
|
||||
virtual reader_permit make_compaction_reader_permit() const = 0;
|
||||
virtual sstables::sstables_manager& get_sstables_manager() noexcept = 0;
|
||||
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const = 0;
|
||||
virtual sstables::shared_sstable make_sstable() const = 0;
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const = 0;
|
||||
virtual api::timestamp_type min_memtable_timestamp() const = 0;
|
||||
virtual api::timestamp_type min_memtable_live_timestamp() const = 0;
|
||||
|
||||
@@ -416,9 +416,7 @@ future<compaction_result> compaction_task_executor::compact_sstables(compaction_
|
||||
descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t));
|
||||
}
|
||||
descriptor.creator = [&t] (shard_id) {
|
||||
// All compaction types going through this path will work on normal input sstables only.
|
||||
// Off-strategy, for example, waits until the sstables move out of staging state.
|
||||
return t.make_sstable(sstables::sstable_state::normal);
|
||||
return t.make_sstable();
|
||||
};
|
||||
descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) {
|
||||
t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables);
|
||||
@@ -1849,10 +1847,6 @@ protected:
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
}, false);
|
||||
if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) {
|
||||
throw make_compaction_stopped_exception();
|
||||
}
|
||||
|
||||
co_return co_await do_rewrite_sstable(std::move(sst));
|
||||
}
|
||||
};
|
||||
@@ -2290,16 +2284,12 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_spl
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) {
|
||||
if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
// Throw an error if split cannot be performed due to e.g. out of space prevention.
|
||||
// We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization,
|
||||
// which is uneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here.
|
||||
if (is_disabled()) {
|
||||
co_return coroutine::exception(std::make_exception_ptr(std::runtime_error(format("Cannot split {} because manager has compaction disabled, " \
|
||||
"reason might be out of space prevention", sst->get_filename()))));
|
||||
if (!can_proceed(&t)) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
@@ -2307,11 +2297,8 @@ compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compac
|
||||
compaction_progress_monitor monitor;
|
||||
compaction_data info = create_compaction_data();
|
||||
compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt);
|
||||
desc.creator = [&t, sst] (shard_id _) {
|
||||
// NOTE: preserves the sstable state, since we want the output to be on the same state as the original.
|
||||
// For example, if base table has views, it's important that sstable produced by repair will be
|
||||
// in the staging state.
|
||||
return t.make_sstable(sst->state());
|
||||
desc.creator = [&t] (shard_id _) {
|
||||
return t.make_sstable();
|
||||
};
|
||||
desc.replacer = [&] (compaction_completion_desc d) {
|
||||
std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret));
|
||||
|
||||
@@ -376,8 +376,7 @@ public:
|
||||
// Splits a single SSTable by segregating all its data according to the classifier.
|
||||
// If SSTable doesn't need split, the same input SSTable is returned as output.
|
||||
// If SSTable needs split, then output SSTables are returned and the input SSTable is deleted.
|
||||
// Exception is thrown if the input sstable cannot be split due to e.g. out of space prevention.
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt);
|
||||
|
||||
// Run a custom job for a given table, defined by a function
|
||||
// it completes when future returned by job is ready or returns immediately
|
||||
|
||||
12
configure.py
12
configure.py
@@ -1698,18 +1698,6 @@ deps['test/vector_search/vector_store_client_test'] = ['test/vector_search/vect
|
||||
deps['test/vector_search/load_balancer_test'] = ['test/vector_search/load_balancer_test.cc'] + scylla_tests_dependencies
|
||||
deps['test/vector_search/client_test'] = ['test/vector_search/client_test.cc'] + scylla_tests_dependencies
|
||||
|
||||
boost_tests_prefixes = ["test/boost/", "test/vector_search/", "test/raft/", "test/manual/", "test/ldap/"]
|
||||
|
||||
# We need to link these files to all Boost tests to make sure that
|
||||
# we can execute `--list_json_content` on them. That will produce
|
||||
# a similar result as calling `--list_content={HRF,DOT}`.
|
||||
# Unfortunately, to be able to do that, we're forced to link the
|
||||
# relevant code by hand.
|
||||
for key in deps.keys():
|
||||
for prefix in boost_tests_prefixes:
|
||||
if key.startswith(prefix):
|
||||
deps[key] += ["test/lib/boost_tree_lister_injector.cc", "test/lib/boost_test_tree_lister.cc"]
|
||||
|
||||
wasm_deps = {}
|
||||
|
||||
wasm_deps['wasm/return_input.wat'] = 'test/resource/wasm/rust/return_input.rs'
|
||||
|
||||
@@ -200,7 +200,6 @@ public:
|
||||
static constexpr auto DICTS = "dicts";
|
||||
static constexpr auto VIEW_BUILDING_TASKS = "view_building_tasks";
|
||||
static constexpr auto CLIENT_ROUTES = "client_routes";
|
||||
static constexpr auto VERSIONS = "versions";
|
||||
|
||||
// auth
|
||||
static constexpr auto ROLES = "roles";
|
||||
|
||||
@@ -605,8 +605,8 @@ public:
|
||||
}
|
||||
|
||||
static schema_ptr build_schema() {
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, system_keyspace::VERSIONS);
|
||||
return schema_builder(system_keyspace::NAME, system_keyspace::VERSIONS, std::make_optional(id))
|
||||
auto id = generate_legacy_id(system_keyspace::NAME, "versions");
|
||||
return schema_builder(system_keyspace::NAME, "versions", std::make_optional(id))
|
||||
.with_column("key", utf8_type, column_kind::partition_key)
|
||||
.with_column("version", utf8_type)
|
||||
.with_column("build_mode", utf8_type)
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
|
||||
## What is ScyllaDB?
|
||||
|
||||
ScyllaDB is a high-performance NoSQL database optimized for speed and scalability.
|
||||
It is designed to efficiently handle large volumes of data with minimal latency,
|
||||
making it ideal for data-intensive applications.
|
||||
|
||||
ScyllaDB is distributed under the [ScyllaDB Source Available License](https://github.com/scylladb/scylladb/blob/master/LICENSE-ScyllaDB-Source-Available.md).
|
||||
ScyllaDB is a high-performance NoSQL database system, fully compatible with Apache Cassandra.
|
||||
ScyllaDB is released under the GNU Affero General Public License version 3 and the Apache License, ScyllaDB is free and open-source software.
|
||||
|
||||
> [ScyllaDB](http://www.scylladb.com/)
|
||||
|
||||
|
||||
@@ -45,9 +45,7 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
|
||||
: _tombstone(x._tombstone)
|
||||
, _static_row(s, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows(use_single_row_storage(s) ?
|
||||
rows_storage_type(std::optional<deletable_row>{}) :
|
||||
rows_storage_type(rows_type{}))
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s.version())
|
||||
@@ -56,30 +54,10 @@ mutation_partition::mutation_partition(const schema& s, const mutation_partition
|
||||
#ifdef SEASTAR_DEBUG
|
||||
SCYLLA_ASSERT(x._schema_version == _schema_version);
|
||||
#endif
|
||||
if (use_single_row_storage(s)) {
|
||||
// Copy single row if it exists
|
||||
if (x.uses_single_row_storage()) {
|
||||
const auto& x_row = x.get_single_row_storage();
|
||||
if (x_row) {
|
||||
get_single_row_storage() = deletable_row(s, *x_row);
|
||||
}
|
||||
} else if (!x.get_rows_storage().empty()) {
|
||||
// Converting from multi-row to single-row - take the first row
|
||||
// This shouldn't normally happen as schema doesn't change this way
|
||||
on_internal_error(mplog, "mutation_partition: cannot convert multi-row partition to single-row");
|
||||
}
|
||||
} else {
|
||||
// Multi-row storage
|
||||
if (x.uses_single_row_storage()) {
|
||||
// Converting from single-row to multi-row - this shouldn't normally happen
|
||||
on_internal_error(mplog, "mutation_partition: cannot convert single-row partition to multi-row");
|
||||
} else {
|
||||
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
|
||||
return current_allocator().construct<rows_entry>(s, *x);
|
||||
};
|
||||
get_rows_storage().clone_from(x.get_rows_storage(), cloner, current_deleter<rows_entry>());
|
||||
}
|
||||
}
|
||||
auto cloner = [&s] (const rows_entry* x) -> rows_entry* {
|
||||
return current_allocator().construct<rows_entry>(s, *x);
|
||||
};
|
||||
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
|
||||
}
|
||||
|
||||
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
|
||||
@@ -87,9 +65,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
||||
: _tombstone(x._tombstone)
|
||||
, _static_row(schema, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows(use_single_row_storage(schema) ?
|
||||
rows_storage_type(std::optional<deletable_row>{}) :
|
||||
rows_storage_type(rows_type{}))
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only())
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(schema.version())
|
||||
@@ -98,37 +74,19 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
|
||||
#ifdef SEASTAR_DEBUG
|
||||
SCYLLA_ASSERT(x._schema_version == _schema_version);
|
||||
#endif
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: just copy the row if it exists
|
||||
if (x.uses_single_row_storage()) {
|
||||
const auto& x_row = x.get_single_row_storage();
|
||||
if (x_row) {
|
||||
get_single_row_storage() = deletable_row(schema, *x_row);
|
||||
try {
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
|
||||
_rows.insert_before_hint(_rows.end(), std::move(ce), rows_entry::tri_compare(schema));
|
||||
}
|
||||
} else {
|
||||
// Filtering from multi-row - shouldn't happen with consistent schema
|
||||
on_internal_error(mplog, "mutation_partition: filtering from multi-row to single-row storage");
|
||||
}
|
||||
} else {
|
||||
// Multi-row storage with filtering
|
||||
if (x.uses_single_row_storage()) {
|
||||
on_internal_error(mplog, "mutation_partition: filtering from single-row to multi-row storage");
|
||||
} else {
|
||||
try {
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
auto ce = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(schema, e));
|
||||
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(ce), rows_entry::tri_compare(schema));
|
||||
}
|
||||
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
|
||||
_row_tombstones.apply(schema, rt.tombstone());
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
|
||||
throw;
|
||||
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
|
||||
_row_tombstones.apply(schema, rt.tombstone());
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
_rows.clear_and_dispose(current_deleter<rows_entry>());
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,20 +104,14 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
|
||||
#ifdef SEASTAR_DEBUG
|
||||
SCYLLA_ASSERT(x._schema_version == _schema_version);
|
||||
#endif
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: no filtering needed, row either exists or doesn't
|
||||
// The move constructor has already moved the row if it exists
|
||||
} else {
|
||||
// Multi-row storage: filter the rows
|
||||
if (!uses_single_row_storage()) {
|
||||
auto deleter = current_deleter<rows_entry>();
|
||||
auto it = get_rows_storage().begin();
|
||||
for (auto&& range : ck_ranges.ranges()) {
|
||||
get_rows_storage().erase_and_dispose(it, lower_bound(schema, range), deleter);
|
||||
it = upper_bound(schema, range);
|
||||
}
|
||||
get_rows_storage().erase_and_dispose(it, get_rows_storage().end(), deleter);
|
||||
{
|
||||
auto deleter = current_deleter<rows_entry>();
|
||||
auto it = _rows.begin();
|
||||
for (auto&& range : ck_ranges.ranges()) {
|
||||
_rows.erase_and_dispose(it, lower_bound(schema, range), deleter);
|
||||
it = upper_bound(schema, range);
|
||||
}
|
||||
_rows.erase_and_dispose(it, _rows.end(), deleter);
|
||||
}
|
||||
{
|
||||
for (auto&& range : ck_ranges.ranges()) {
|
||||
@@ -175,11 +127,7 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
|
||||
}
|
||||
|
||||
mutation_partition::~mutation_partition() {
|
||||
if (uses_single_row_storage()) {
|
||||
// Single-row storage: optional destructor handles cleanup
|
||||
} else {
|
||||
get_rows_storage().clear_and_dispose(current_deleter<rows_entry>());
|
||||
}
|
||||
_rows.clear_and_dispose(current_deleter<rows_entry>());
|
||||
}
|
||||
|
||||
mutation_partition&
|
||||
@@ -193,14 +141,10 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
|
||||
|
||||
void mutation_partition::ensure_last_dummy(const schema& s) {
|
||||
check_schema(s);
|
||||
if (uses_single_row_storage()) {
|
||||
// Single-row storage doesn't use dummy entries
|
||||
return;
|
||||
}
|
||||
if (get_rows_storage().empty() || !get_rows_storage().rbegin()->is_last_dummy()) {
|
||||
if (_rows.empty() || !_rows.rbegin()->is_last_dummy()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::yes));
|
||||
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
|
||||
_rows.insert_before(_rows.end(), std::move(e));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -475,18 +419,9 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
|
||||
check_schema(schema);
|
||||
row_tombstone t = row_tombstone(range_tombstone_for_row(schema, key));
|
||||
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: check if the single row exists and has tombstone
|
||||
const auto& row_opt = get_single_row_storage();
|
||||
if (row_opt) {
|
||||
t.apply(row_opt->deleted_at(), row_opt->marker());
|
||||
}
|
||||
} else {
|
||||
// Multi-row storage: search in B-tree
|
||||
auto j = get_rows_storage().find(key, rows_entry::tri_compare(schema));
|
||||
if (j != get_rows_storage().end()) {
|
||||
t.apply(j->row().deleted_at(), j->row().marker());
|
||||
}
|
||||
auto j = _rows.find(key, rows_entry::tri_compare(schema));
|
||||
if (j != _rows.end()) {
|
||||
t.apply(j->row().deleted_at(), j->row().marker());
|
||||
}
|
||||
|
||||
return t;
|
||||
@@ -569,178 +504,97 @@ void mutation_partition::apply_insert(const schema& s, clustering_key_view key,
|
||||
clustered_row(s, key).apply(row_marker(created_at, ttl, expiry));
|
||||
}
|
||||
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: just set the row
|
||||
get_single_row_storage() = std::move(row);
|
||||
} else {
|
||||
// Multi-row storage: insert into B-tree
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key, std::move(row)));
|
||||
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key, std::move(row)));
|
||||
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
|
||||
}
|
||||
|
||||
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
|
||||
check_schema(s);
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: just copy the row
|
||||
get_single_row_storage() = row;
|
||||
} else {
|
||||
// Multi-row storage: insert into B-tree
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, key, row));
|
||||
get_rows_storage().insert_before_hint(get_rows_storage().end(), std::move(e), rows_entry::tri_compare(s));
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, key, row));
|
||||
_rows.insert_before_hint(_rows.end(), std::move(e), rows_entry::tri_compare(s));
|
||||
}
|
||||
|
||||
const row*
|
||||
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
|
||||
check_schema(s);
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: return the single row's cells if it exists
|
||||
const auto& row_opt = get_single_row_storage();
|
||||
if (row_opt) {
|
||||
return &row_opt->cells();
|
||||
}
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
return nullptr;
|
||||
} else {
|
||||
// Multi-row storage: search in B-tree
|
||||
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
|
||||
if (i == get_rows_storage().end()) {
|
||||
return nullptr;
|
||||
}
|
||||
return &i->row().cells();
|
||||
}
|
||||
return &i->row().cells();
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
|
||||
check_schema(s);
|
||||
check_row_key(s, key, is_dummy::no);
|
||||
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: create row if it doesn't exist
|
||||
auto& row_opt = get_single_row_storage();
|
||||
if (!row_opt) {
|
||||
row_opt = deletable_row();
|
||||
}
|
||||
return *row_opt;
|
||||
} else {
|
||||
// Multi-row storage: find or insert in B-tree
|
||||
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
|
||||
if (i == get_rows_storage().end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(std::move(key)));
|
||||
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(std::move(key)));
|
||||
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
|
||||
check_schema(s);
|
||||
check_row_key(s, key, is_dummy::no);
|
||||
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: create row if it doesn't exist
|
||||
auto& row_opt = get_single_row_storage();
|
||||
if (!row_opt) {
|
||||
row_opt = deletable_row();
|
||||
}
|
||||
return *row_opt;
|
||||
} else {
|
||||
// Multi-row storage: find or insert in B-tree
|
||||
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
|
||||
if (i == get_rows_storage().end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
|
||||
check_schema(s);
|
||||
check_row_key(s, key, is_dummy::no);
|
||||
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: create row if it doesn't exist
|
||||
auto& row_opt = get_single_row_storage();
|
||||
if (!row_opt) {
|
||||
row_opt = deletable_row();
|
||||
}
|
||||
return *row_opt;
|
||||
} else {
|
||||
// Multi-row storage: find or insert in B-tree
|
||||
auto i = get_rows_storage().find(key, rows_entry::tri_compare(s));
|
||||
if (i == get_rows_storage().end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
auto i = _rows.find(key, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(key));
|
||||
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return i->row();
|
||||
}
|
||||
|
||||
rows_entry&
|
||||
mutation_partition::clustered_rows_entry(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
check_schema(s);
|
||||
check_row_key(s, pos, dummy);
|
||||
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage doesn't use rows_entry - this shouldn't be called
|
||||
on_internal_error(mplog, "mutation_partition::clustered_rows_entry() called with single-row storage");
|
||||
}
|
||||
|
||||
auto i = get_rows_storage().find(pos, rows_entry::tri_compare(s));
|
||||
if (i == get_rows_storage().end()) {
|
||||
auto i = _rows.find(pos, rows_entry::tri_compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
|
||||
i = get_rows_storage().insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
i = _rows.insert_before_hint(i, std::move(e), rows_entry::tri_compare(s)).first;
|
||||
}
|
||||
return *i;
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: ignore dummy/continuous flags, just get/create the row
|
||||
check_row_key(s, pos, dummy);
|
||||
auto& row_opt = get_single_row_storage();
|
||||
if (!row_opt) {
|
||||
row_opt = deletable_row();
|
||||
}
|
||||
return *row_opt;
|
||||
} else {
|
||||
return clustered_rows_entry(s, pos, dummy, continuous).row();
|
||||
}
|
||||
return clustered_rows_entry(s, pos, dummy, continuous).row();
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::append_clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
|
||||
check_schema(s);
|
||||
check_row_key(s, pos, dummy);
|
||||
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: just create/get the row
|
||||
auto& row_opt = get_single_row_storage();
|
||||
if (!row_opt) {
|
||||
row_opt = deletable_row();
|
||||
}
|
||||
return *row_opt;
|
||||
}
|
||||
|
||||
const auto cmp = rows_entry::tri_compare(s);
|
||||
auto i = get_rows_storage().end();
|
||||
if (!get_rows_storage().empty() && (cmp(*std::prev(i), pos) >= 0)) {
|
||||
auto i = _rows.end();
|
||||
if (!_rows.empty() && (cmp(*std::prev(i), pos) >= 0)) {
|
||||
on_internal_error(mplog, format("mutation_partition::append_clustered_row(): cannot append clustering row with key {} to the partition"
|
||||
", last clustering row is equal or greater: {}", pos, std::prev(i)->position()));
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(s, pos, dummy, continuous));
|
||||
i = get_rows_storage().insert_before_hint(i, std::move(e), cmp).first;
|
||||
i = _rows.insert_before_hint(i, std::move(e), cmp).first;
|
||||
|
||||
return i->row();
|
||||
}
|
||||
@@ -748,33 +602,19 @@ mutation_partition::append_clustered_row(const schema& s, position_in_partition_
|
||||
mutation_partition::rows_type::const_iterator
|
||||
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
|
||||
check_schema(schema);
|
||||
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: always return end iterator (empty range)
|
||||
static const rows_type empty_rows;
|
||||
return empty_rows.end();
|
||||
}
|
||||
|
||||
if (!r.start()) {
|
||||
return std::cbegin(get_rows_storage());
|
||||
return std::cbegin(_rows);
|
||||
}
|
||||
return get_rows_storage().lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_start(r), rows_entry::tri_compare(schema));
|
||||
}
|
||||
|
||||
mutation_partition::rows_type::const_iterator
|
||||
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) const {
|
||||
check_schema(schema);
|
||||
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: always return end iterator (empty range)
|
||||
static const rows_type empty_rows;
|
||||
return empty_rows.end();
|
||||
}
|
||||
|
||||
if (!r.end()) {
|
||||
return std::cend(get_rows_storage());
|
||||
return std::cend(_rows);
|
||||
}
|
||||
return get_rows_storage().lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
|
||||
return _rows.lower_bound(position_in_partition_view::for_range_end(r), rows_entry::tri_compare(schema));
|
||||
}
|
||||
|
||||
std::ranges::subrange<mutation_partition::rows_type::const_iterator>
|
||||
@@ -785,32 +625,17 @@ mutation_partition::range(const schema& schema, const query::clustering_range& r
|
||||
|
||||
std::ranges::subrange<mutation_partition::rows_type::iterator>
|
||||
mutation_partition::range(const schema& schema, const query::clustering_range& r) {
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: return empty range (rows_entry iteration not applicable)
|
||||
static rows_type empty_rows;
|
||||
return std::ranges::subrange(empty_rows.begin(), empty_rows.end());
|
||||
}
|
||||
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->range(schema, r));
|
||||
return unconst(_rows, static_cast<const mutation_partition*>(this)->range(schema, r));
|
||||
}
|
||||
|
||||
mutation_partition::rows_type::iterator
|
||||
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) {
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: return end iterator (empty range)
|
||||
static rows_type empty_rows;
|
||||
return empty_rows.end();
|
||||
}
|
||||
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
|
||||
return unconst(_rows, static_cast<const mutation_partition*>(this)->lower_bound(schema, r));
|
||||
}
|
||||
|
||||
mutation_partition::rows_type::iterator
|
||||
mutation_partition::upper_bound(const schema& schema, const query::clustering_range& r) {
|
||||
if (use_single_row_storage(schema)) {
|
||||
// Single-row storage: return end iterator (empty range)
|
||||
static rows_type empty_rows;
|
||||
return empty_rows.end();
|
||||
}
|
||||
return unconst(get_rows_storage(), static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
|
||||
return unconst(_rows, static_cast<const mutation_partition*>(this)->upper_bound(schema, r));
|
||||
}
|
||||
|
||||
template<typename Func>
|
||||
@@ -1552,15 +1377,7 @@ bool mutation_partition::empty() const
|
||||
if (_tombstone.timestamp != api::missing_timestamp) {
|
||||
return false;
|
||||
}
|
||||
if (_static_row.size() || !_row_tombstones.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (uses_single_row_storage()) {
|
||||
return !get_single_row_storage().has_value();
|
||||
} else {
|
||||
return get_rows_storage().empty();
|
||||
}
|
||||
return !_static_row.size() && _rows.empty() && _row_tombstones.empty();
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -1605,11 +1422,7 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
|
||||
|
||||
uint64_t
|
||||
mutation_partition::row_count() const {
|
||||
if (uses_single_row_storage()) {
|
||||
return get_single_row_storage().has_value() ? 1 : 0;
|
||||
} else {
|
||||
return get_rows_storage().calculate_size();
|
||||
}
|
||||
return _rows.calculate_size();
|
||||
}
|
||||
|
||||
rows_entry::rows_entry(rows_entry&& o) noexcept
|
||||
@@ -2406,22 +2219,15 @@ public:
|
||||
mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const schema& s, tombstone t)
|
||||
: _tombstone(t)
|
||||
, _static_row_continuous(!s.has_static_columns())
|
||||
, _rows(use_single_row_storage(s) ?
|
||||
rows_storage_type(std::optional<deletable_row>{}) :
|
||||
rows_storage_type(rows_type{}))
|
||||
, _rows()
|
||||
, _row_tombstones(s)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{
|
||||
if (use_single_row_storage(s)) {
|
||||
// Single-row storage: no dummy entries needed, leave row as empty optional
|
||||
} else {
|
||||
// Multi-row storage: add last dummy entry for discontinuous partition
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
|
||||
get_rows_storage().insert_before(get_rows_storage().end(), std::move(e));
|
||||
}
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(s, rows_entry::last_dummy_tag(), is_continuous::no));
|
||||
_rows.insert_before(_rows.end(), std::move(e));
|
||||
}
|
||||
|
||||
bool mutation_partition::is_fully_continuous() const {
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <iosfwd>
|
||||
#include <variant>
|
||||
#include <boost/intrusive/parent_from_member.hpp>
|
||||
|
||||
#include <seastar/util/optimized_optional.hh>
|
||||
@@ -1189,12 +1188,6 @@ inline void check_row_key(const schema& s, position_in_partition_view pos, is_du
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if the schema has no clustering keys, meaning partitions can have at most one row.
|
||||
// When true, mutation_partition uses std::optional<deletable_row> instead of full rows_type container.
|
||||
inline bool use_single_row_storage(const schema& s) {
|
||||
return s.clustering_key_size() == 0;
|
||||
}
|
||||
|
||||
// Represents a set of writes made to a single partition.
|
||||
//
|
||||
// The object is schema-dependent. Each instance is governed by some
|
||||
@@ -1235,45 +1228,20 @@ inline bool use_single_row_storage(const schema& s) {
|
||||
class mutation_partition final {
|
||||
public:
|
||||
using rows_type = rows_entry::container_type;
|
||||
using rows_storage_type = std::variant<rows_type, std::optional<deletable_row>>;
|
||||
friend class size_calculator;
|
||||
private:
|
||||
tombstone _tombstone;
|
||||
lazy_row _static_row;
|
||||
bool _static_row_continuous = true;
|
||||
rows_storage_type _rows;
|
||||
rows_type _rows;
|
||||
// Contains only strict prefixes so that we don't have to lookup full keys
|
||||
// in both _row_tombstones and _rows.
|
||||
// Note: empty when using single-row storage (std::optional<deletable_row> variant)
|
||||
range_tombstone_list _row_tombstones;
|
||||
#ifdef SEASTAR_DEBUG
|
||||
table_schema_version _schema_version;
|
||||
#endif
|
||||
|
||||
friend class converting_mutation_partition_applier;
|
||||
|
||||
// Returns true if this partition uses single-row storage
|
||||
bool uses_single_row_storage() const {
|
||||
return std::holds_alternative<std::optional<deletable_row>>(_rows);
|
||||
}
|
||||
|
||||
// Get reference to rows container (multi-row storage)
|
||||
rows_type& get_rows_storage() {
|
||||
return std::get<rows_type>(_rows);
|
||||
}
|
||||
|
||||
const rows_type& get_rows_storage() const {
|
||||
return std::get<rows_type>(_rows);
|
||||
}
|
||||
|
||||
// Get reference to single row storage
|
||||
std::optional<deletable_row>& get_single_row_storage() {
|
||||
return std::get<std::optional<deletable_row>>(_rows);
|
||||
}
|
||||
|
||||
const std::optional<deletable_row>& get_single_row_storage() const {
|
||||
return std::get<std::optional<deletable_row>>(_rows);
|
||||
}
|
||||
public:
|
||||
struct copy_comparators_only {};
|
||||
struct incomplete_tag {};
|
||||
@@ -1283,14 +1251,14 @@ public:
|
||||
return mutation_partition(incomplete_tag(), s, t);
|
||||
}
|
||||
mutation_partition(const schema& s)
|
||||
: _rows(use_single_row_storage(s) ? rows_storage_type(std::optional<deletable_row>{}) : rows_storage_type(rows_type{}))
|
||||
: _rows()
|
||||
, _row_tombstones(s)
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(s.version())
|
||||
#endif
|
||||
{ }
|
||||
mutation_partition(mutation_partition& other, copy_comparators_only)
|
||||
: _rows(other._rows.index() == 0 ? rows_storage_type(rows_type{}) : rows_storage_type(std::optional<deletable_row>{}))
|
||||
: _rows()
|
||||
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
|
||||
#ifdef SEASTAR_DEBUG
|
||||
, _schema_version(other._schema_version)
|
||||
@@ -1301,8 +1269,6 @@ public:
|
||||
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
|
||||
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
|
||||
~mutation_partition();
|
||||
// Returns the mutation_partition containing the given rows_type.
|
||||
// Can only be used when the mutation_partition uses multi-row storage.
|
||||
static mutation_partition& container_of(rows_type&);
|
||||
mutation_partition& operator=(mutation_partition&& x) noexcept;
|
||||
bool equal(const schema&, const mutation_partition&) const;
|
||||
@@ -1496,31 +1462,9 @@ public:
|
||||
const lazy_row& static_row() const { return _static_row; }
|
||||
|
||||
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
|
||||
// For single-row storage (clustering_key_size() == 0), returns an empty container.
|
||||
// Callers should check uses_single_row_storage() and use get_single_row() for single-row case.
|
||||
const rows_type& clustered_rows() const noexcept {
|
||||
if (uses_single_row_storage()) {
|
||||
static const rows_type empty_rows;
|
||||
return empty_rows;
|
||||
}
|
||||
return get_rows_storage();
|
||||
}
|
||||
utils::immutable_collection<rows_type> clustered_rows() noexcept {
|
||||
return const_cast<const mutation_partition*>(this)->clustered_rows();
|
||||
}
|
||||
rows_type& mutable_clustered_rows() noexcept {
|
||||
// Should only be called when NOT using single-row storage
|
||||
return get_rows_storage();
|
||||
}
|
||||
|
||||
// Access the single row when using single-row storage (clustering_key_size() == 0)
|
||||
const std::optional<deletable_row>& get_single_row() const {
|
||||
return get_single_row_storage();
|
||||
}
|
||||
|
||||
std::optional<deletable_row>& get_single_row() {
|
||||
return get_single_row_storage();
|
||||
}
|
||||
const rows_type& clustered_rows() const noexcept { return _rows; }
|
||||
utils::immutable_collection<rows_type> clustered_rows() noexcept { return _rows; }
|
||||
rows_type& mutable_clustered_rows() noexcept { return _rows; }
|
||||
|
||||
const range_tombstone_list& row_tombstones() const noexcept { return _row_tombstones; }
|
||||
utils::immutable_collection<range_tombstone_list> row_tombstones() noexcept { return _row_tombstones; }
|
||||
@@ -1538,14 +1482,8 @@ public:
|
||||
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
|
||||
std::ranges::subrange<rows_type::iterator> range(const schema& schema, const query::clustering_range& r);
|
||||
// Returns an iterator range of rows_entry, with only non-dummy entries.
|
||||
// For single-row storage, returns an empty range.
|
||||
auto non_dummy_rows() const {
|
||||
if (uses_single_row_storage()) {
|
||||
static const rows_type empty_rows;
|
||||
return std::ranges::subrange(empty_rows.begin(), empty_rows.end())
|
||||
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
|
||||
}
|
||||
return std::ranges::subrange(get_rows_storage().begin(), get_rows_storage().end())
|
||||
return std::ranges::subrange(_rows.begin(), _rows.end())
|
||||
| std::views::filter([] (const rows_entry& e) { return bool(!e.dummy()); });
|
||||
}
|
||||
void accept(const schema&, mutation_partition_visitor&) const;
|
||||
@@ -1579,21 +1517,7 @@ private:
|
||||
|
||||
inline
|
||||
mutation_partition& mutation_partition::container_of(rows_type& rows) {
|
||||
// This method can only be called when using multi-row storage (rows_type variant alternative).
|
||||
// With std::variant, when rows_type is the active alternative (index 0), it's stored at the beginning of the variant.
|
||||
// We can use pointer arithmetic to get back to the mutation_partition.
|
||||
|
||||
// Calculate offset from rows_type to the containing variant
|
||||
// The rows reference should be the active rows_type inside the variant
|
||||
static_assert(std::is_same_v<std::variant_alternative_t<0, rows_storage_type>, rows_type>,
|
||||
"rows_type must be the first alternative in rows_storage_type");
|
||||
|
||||
// Get address of the variant containing this rows_type
|
||||
// When rows_type is active (index 0), it's at offset 0 in the variant's storage
|
||||
rows_storage_type* variant_ptr = reinterpret_cast<rows_storage_type*>(&rows);
|
||||
|
||||
// Now get the mutation_partition from the variant
|
||||
return *boost::intrusive::get_parent_from_member(variant_ptr, &mutation_partition::_rows);
|
||||
return *boost::intrusive::get_parent_from_member(&rows, &mutation_partition::_rows);
|
||||
}
|
||||
|
||||
bool has_any_live_data(const schema& s, column_kind kind, const row& cells, tombstone tomb = tombstone(),
|
||||
|
||||
@@ -1195,8 +1195,6 @@ private:
|
||||
rlogger.info("{}", msg);
|
||||
throw std::runtime_error(msg);
|
||||
}
|
||||
|
||||
co_await utils::get_local_injector().inject("incremental_repair_prepare_wait", utils::wait_for_message(60s));
|
||||
auto reenablers_and_holders = co_await table.get_compaction_reenablers_and_lock_holders_for_repair(_db.local(), _frozen_topology_guard, _range);
|
||||
for (auto& lock_holder : reenablers_and_holders.lock_holders) {
|
||||
_rs._repair_compaction_locks[_frozen_topology_guard].push_back(std::move(lock_holder));
|
||||
|
||||
@@ -84,10 +84,6 @@ class compaction_group {
|
||||
seastar::named_gate _async_gate;
|
||||
// Gates flushes.
|
||||
seastar::named_gate _flush_gate;
|
||||
// Gates sstable being added to the group.
|
||||
// This prevents the group from being considered empty when sstables are being added.
|
||||
// Crucial for tablet split which ACKs split for a table when all pre-split groups are empty.
|
||||
seastar::named_gate _sstable_add_gate;
|
||||
bool _tombstone_gc_enabled = true;
|
||||
std::optional<compaction::compaction_backlog_tracker> _backlog_tracker;
|
||||
repair_classifier_func _repair_sstable_classifier;
|
||||
@@ -252,10 +248,6 @@ public:
|
||||
return _flush_gate;
|
||||
}
|
||||
|
||||
seastar::named_gate& sstable_add_gate() noexcept {
|
||||
return _sstable_add_gate;
|
||||
}
|
||||
|
||||
compaction::compaction_manager& get_compaction_manager() noexcept;
|
||||
const compaction::compaction_manager& get_compaction_manager() const noexcept;
|
||||
|
||||
@@ -442,7 +434,7 @@ public:
|
||||
virtual bool all_storage_groups_split() = 0;
|
||||
virtual future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) = 0;
|
||||
virtual future<> maybe_split_compaction_group_of(size_t idx) = 0;
|
||||
virtual future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) = 0;
|
||||
virtual future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) = 0;
|
||||
virtual dht::token_range get_token_range_after_split(const dht::token&) const noexcept = 0;
|
||||
|
||||
virtual lw_shared_ptr<sstables::sstable_set> make_sstable_set() const = 0;
|
||||
|
||||
@@ -604,28 +604,9 @@ public:
|
||||
|
||||
data_dictionary::table as_data_dictionary() const;
|
||||
|
||||
// The usage of these functions are restricted to preexisting sstables that aren't being
|
||||
// moved anywhere, so should never be used in the context of file streaming and intra
|
||||
// node migration. The only user today is distributed loader, which populates the
|
||||
// sstables for each column family on boot.
|
||||
future<> add_sstable_and_update_cache(sstables::shared_sstable sst,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
|
||||
|
||||
// Restricted to new sstables produced by external processes such as repair.
|
||||
// The sstable might undergo split if table is in split mode.
|
||||
// If no need for split, the input sstable will only be attached to the sstable set.
|
||||
// If split happens, the output sstables will be attached and the input sstable unlinked.
|
||||
// On failure, the input sstable is unlinked and exception propagated to the caller.
|
||||
// The on_add callback will be called on all sstables to be added into the set.
|
||||
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
|
||||
add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add,
|
||||
sstables::offstrategy offstrategy = sstables::offstrategy::no);
|
||||
[[nodiscard]] future<std::vector<sstables::shared_sstable>>
|
||||
add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add);
|
||||
|
||||
future<> move_sstables_from_staging(std::vector<sstables::shared_sstable>);
|
||||
sstables::shared_sstable make_sstable();
|
||||
void set_truncation_time(db_clock::time_point truncated_at) noexcept {
|
||||
@@ -743,9 +724,7 @@ private:
|
||||
return _config.enable_cache && _schema->caching_options().enabled();
|
||||
}
|
||||
void update_stats_for_new_sstable(const sstables::shared_sstable& sst) noexcept;
|
||||
// This function can throw even if the sstable was added into the set. When the sstable was successfully
|
||||
// added, the sstable ptr @sst will be set to nullptr. Allowing caller to optionally discard the sstable.
|
||||
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy, bool trigger_compaction);
|
||||
future<> do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy, bool trigger_compaction);
|
||||
future<> do_add_sstable_and_update_cache(sstables::shared_sstable sst, sstables::offstrategy offstrategy, bool trigger_compaction);
|
||||
// Helpers which add sstable on behalf of a compaction group and refreshes compound set.
|
||||
void add_sstable(compaction_group& cg, sstables::shared_sstable sstable);
|
||||
@@ -1379,8 +1358,7 @@ public:
|
||||
|
||||
// Clones storage of a given tablet. Memtable is flushed first to guarantee that the
|
||||
// snapshot (list of sstables) will include all the data written up to the time it was taken.
|
||||
// If leave_unsealead is set, all the destination sstables will be left unsealed.
|
||||
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed);
|
||||
future<utils::chunked_vector<sstables::entry_descriptor>> clone_tablet_storage(locator::tablet_id tid);
|
||||
|
||||
friend class compaction_group;
|
||||
friend class compaction::compaction_task_impl;
|
||||
|
||||
122
replica/table.cc
122
replica/table.cc
@@ -721,7 +721,7 @@ public:
|
||||
bool all_storage_groups_split() override { return true; }
|
||||
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override { return make_ready_future(); }
|
||||
future<> maybe_split_compaction_group_of(size_t idx) override { return make_ready_future(); }
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override {
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override {
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>{sst});
|
||||
}
|
||||
dht::token_range get_token_range_after_split(const dht::token&) const noexcept override { return dht::token_range(); }
|
||||
@@ -879,7 +879,7 @@ public:
|
||||
bool all_storage_groups_split() override;
|
||||
future<> split_all_storage_groups(tasks::task_info tablet_split_task_info) override;
|
||||
future<> maybe_split_compaction_group_of(size_t idx) override;
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_new_sstable(const sstables::shared_sstable& sst) override;
|
||||
future<std::vector<sstables::shared_sstable>> maybe_split_sstable(const sstables::shared_sstable& sst) override;
|
||||
dht::token_range get_token_range_after_split(const dht::token& token) const noexcept override {
|
||||
return tablet_map().get_token_range_after_split(token);
|
||||
}
|
||||
@@ -1130,8 +1130,7 @@ future<> tablet_storage_group_manager::maybe_split_compaction_group_of(size_t id
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
|
||||
co_await utils::get_local_injector().inject("maybe_split_new_sstable_wait", utils::wait_for_message(120s));
|
||||
tablet_storage_group_manager::maybe_split_sstable(const sstables::shared_sstable& sst) {
|
||||
if (!tablet_map().needs_split()) {
|
||||
co_return std::vector<sstables::shared_sstable>{sst};
|
||||
}
|
||||
@@ -1139,7 +1138,8 @@ tablet_storage_group_manager::maybe_split_new_sstable(const sstables::shared_sst
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
auto holder = cg.async_gate().hold();
|
||||
auto& view = cg.view_for_sstable(sst);
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_new_sstable(sst, view, co_await split_compaction_options());
|
||||
auto lock_holder = co_await _t.get_compaction_manager().get_incremental_repair_read_lock(view, "maybe_split_sstable");
|
||||
co_return co_await _t.get_compaction_manager().maybe_split_sstable(sst, view, co_await split_compaction_options());
|
||||
}
|
||||
|
||||
future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
||||
@@ -1149,7 +1149,7 @@ future<> table::maybe_split_compaction_group_of(locator::tablet_id tablet_id) {
|
||||
|
||||
future<std::vector<sstables::shared_sstable>> table::maybe_split_new_sstable(const sstables::shared_sstable& sst) {
|
||||
auto holder = async_gate().hold();
|
||||
co_return co_await _sg_manager->maybe_split_new_sstable(sst);
|
||||
co_return co_await _sg_manager->maybe_split_sstable(sst);
|
||||
}
|
||||
|
||||
dht::token_range table::get_token_range_after_split(const dht::token& token) const noexcept {
|
||||
@@ -1330,7 +1330,7 @@ future<utils::chunked_vector<sstables::shared_sstable>> table::take_sstable_set_
|
||||
}
|
||||
|
||||
future<utils::chunked_vector<sstables::entry_descriptor>>
|
||||
table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
|
||||
table::clone_tablet_storage(locator::tablet_id tid) {
|
||||
utils::chunked_vector<sstables::entry_descriptor> ret;
|
||||
auto holder = async_gate().hold();
|
||||
|
||||
@@ -1342,7 +1342,7 @@ table::clone_tablet_storage(locator::tablet_id tid, bool leave_unsealed) {
|
||||
// by compaction while we are waiting for the lock.
|
||||
auto deletion_guard = co_await get_sstable_list_permit();
|
||||
co_await sg.make_sstable_set()->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) -> future<> {
|
||||
ret.push_back(co_await sst->clone(calculate_generation_for_new_table(), leave_unsealed));
|
||||
ret.push_back(co_await sst->clone(calculate_generation_for_new_table()));
|
||||
});
|
||||
co_return ret;
|
||||
}
|
||||
@@ -1354,10 +1354,10 @@ void table::update_stats_for_new_sstable(const sstables::shared_sstable& sst) no
|
||||
}
|
||||
|
||||
future<>
|
||||
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable& sst, sstables::offstrategy offstrategy,
|
||||
table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_sstable sst, sstables::offstrategy offstrategy,
|
||||
bool trigger_compaction) {
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () mutable noexcept {
|
||||
co_return co_await get_row_cache().invalidate(row_cache::external_updater([&] () noexcept {
|
||||
// FIXME: this is not really noexcept, but we need to provide strong exception guarantees.
|
||||
// atomically load all opened sstables into column family.
|
||||
if (!offstrategy) {
|
||||
@@ -1369,8 +1369,6 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
|
||||
if (trigger_compaction) {
|
||||
try_trigger_compaction(cg);
|
||||
}
|
||||
// Reseting sstable ptr to inform the caller the sstable has been loaded successfully.
|
||||
sst = nullptr;
|
||||
}), dht::partition_range::make({sst->get_first_decorated_key(), true}, {sst->get_last_decorated_key(), true}), [sst, schema = _schema] (const dht::decorated_key& key) {
|
||||
return sst->filter_has_key(sstables::key::from_partition_key(*schema, key.key()));
|
||||
});
|
||||
@@ -1378,10 +1376,12 @@ table::do_add_sstable_and_update_cache(compaction_group& cg, sstables::shared_ss
|
||||
|
||||
future<>
|
||||
table::do_add_sstable_and_update_cache(sstables::shared_sstable new_sst, sstables::offstrategy offstrategy, bool trigger_compaction) {
|
||||
auto& cg = compaction_group_for_sstable(new_sst);
|
||||
// Hold gate to make share compaction group is alive.
|
||||
auto holder = cg.async_gate().hold();
|
||||
co_await do_add_sstable_and_update_cache(cg, new_sst, offstrategy, trigger_compaction);
|
||||
for (auto sst : co_await maybe_split_new_sstable(new_sst)) {
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
// Hold gate to make share compaction group is alive.
|
||||
auto holder = cg.async_gate().hold();
|
||||
co_await do_add_sstable_and_update_cache(cg, std::move(sst), offstrategy, trigger_compaction);
|
||||
}
|
||||
}
|
||||
|
||||
future<>
|
||||
@@ -1399,85 +1399,6 @@ table::add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>
|
||||
trigger_compaction();
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
table::add_new_sstable_and_update_cache(sstables::shared_sstable new_sst,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add,
|
||||
sstables::offstrategy offstrategy) {
|
||||
std::vector<sstables::shared_sstable> ret, ssts;
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
bool trigger_compaction = offstrategy == sstables::offstrategy::no;
|
||||
auto& cg = compaction_group_for_sstable(new_sst);
|
||||
// This prevents compaction group from being considered empty until the holder is released.
|
||||
// Helpful for tablet split, where split is acked for a table when all pre-split groups are empty.
|
||||
auto sstable_add_holder = cg.sstable_add_gate().hold();
|
||||
|
||||
ret = ssts = co_await maybe_split_new_sstable(new_sst);
|
||||
// on sucessful split, input sstable is unlinked.
|
||||
new_sst = nullptr;
|
||||
for (auto& sst : ssts) {
|
||||
auto& cg = compaction_group_for_sstable(sst);
|
||||
// Hold gate to make sure compaction group is alive.
|
||||
auto holder = cg.async_gate().hold();
|
||||
co_await on_add(sst);
|
||||
// If do_add_sstable_and_update_cache() throws after sstable has been loaded, the pointer
|
||||
// sst passed by reference will be set to nullptr, so it won't be unlinked in the exception
|
||||
// handler below.
|
||||
co_await do_add_sstable_and_update_cache(cg, sst, offstrategy, trigger_compaction);
|
||||
sst = nullptr;
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
if (ex) {
|
||||
// on failed split, input sstable is unlinked here.
|
||||
if (new_sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", new_sst->get_filename(), new_sst->get_origin(), ex);
|
||||
co_await new_sst->unlink();
|
||||
}
|
||||
// on failure after sucessful split, sstables not attached yet will be unlinked
|
||||
co_await coroutine::parallel_for_each(ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
co_await sst->unlink();
|
||||
}
|
||||
});
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
co_return std::move(ret);
|
||||
}
|
||||
|
||||
future<std::vector<sstables::shared_sstable>>
|
||||
table::add_new_sstables_and_update_cache(std::vector<sstables::shared_sstable> new_ssts,
|
||||
std::function<future<>(sstables::shared_sstable)> on_add) {
|
||||
std::exception_ptr ex;
|
||||
std::vector<sstables::shared_sstable> ret;
|
||||
|
||||
// We rely on add_new_sstable_and_update_cache() to unlink the sstable feeded into it,
|
||||
// so the exception handling below will only have to unlink sstables not processed yet.
|
||||
try {
|
||||
for (auto& sst: new_ssts) {
|
||||
auto ssts = co_await add_new_sstable_and_update_cache(std::exchange(sst, nullptr), on_add);
|
||||
std::ranges::move(ssts, std::back_inserter(ret));
|
||||
|
||||
}
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
|
||||
if (ex) {
|
||||
co_await coroutine::parallel_for_each(new_ssts, [&ex] (sstables::shared_sstable sst) -> future<> {
|
||||
if (sst) {
|
||||
tlogger.error("Failed to load SSTable {} of origin {} due to {}, it will be unlinked...", sst->get_filename(), sst->get_origin(), ex);
|
||||
co_await sst->unlink();
|
||||
}
|
||||
});
|
||||
co_await coroutine::return_exception_ptr(std::move(ex));
|
||||
}
|
||||
co_return std::move(ret);
|
||||
}
|
||||
|
||||
future<>
|
||||
table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector<sstables::shared_sstable> ssts) {
|
||||
auto permit = co_await seastar::get_units(_sstable_set_mutation_sem, 1);
|
||||
@@ -2691,8 +2612,8 @@ public:
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _t.get_sstables_manager();
|
||||
}
|
||||
sstables::shared_sstable make_sstable(sstables::sstable_state state) const override {
|
||||
return _t.make_sstable(state);
|
||||
sstables::shared_sstable make_sstable() const override {
|
||||
return _t.make_sstable();
|
||||
}
|
||||
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
||||
auto cfg = _t.get_sstables_manager().configure_writer(std::move(origin));
|
||||
@@ -2810,7 +2731,6 @@ future<> compaction_group::stop(sstring reason) noexcept {
|
||||
auto flush_future = co_await seastar::coroutine::as_future(flush());
|
||||
|
||||
co_await _flush_gate.close();
|
||||
co_await _sstable_add_gate.close();
|
||||
// FIXME: indentation
|
||||
_compaction_disabler_for_views.clear();
|
||||
co_await utils::get_local_injector().inject("compaction_group_stop_wait", utils::wait_for_message(60s));
|
||||
@@ -2824,7 +2744,7 @@ future<> compaction_group::stop(sstring reason) noexcept {
|
||||
}
|
||||
|
||||
bool compaction_group::empty() const noexcept {
|
||||
return _memtables->empty() && live_sstable_count() == 0 && _sstable_add_gate.get_count() == 0;
|
||||
return _memtables->empty() && live_sstable_count() == 0;
|
||||
}
|
||||
|
||||
const schema_ptr& compaction_group::schema() const {
|
||||
@@ -3280,7 +3200,7 @@ db::replay_position table::highest_flushed_replay_position() const {
|
||||
}
|
||||
|
||||
struct manifest_json : public json::json_base {
|
||||
json::json_chunked_list<std::string_view> files;
|
||||
json::json_chunked_list<sstring> files;
|
||||
|
||||
manifest_json() {
|
||||
register_params();
|
||||
@@ -3304,7 +3224,7 @@ table::seal_snapshot(sstring jsondir, std::vector<snapshot_file_set> file_sets)
|
||||
manifest_json manifest;
|
||||
for (const auto& fsp : file_sets) {
|
||||
for (auto& rf : *fsp) {
|
||||
manifest.files.push(std::string_view(rf));
|
||||
manifest.files.push(std::move(rf));
|
||||
}
|
||||
}
|
||||
auto streamer = json::stream_object(std::move(manifest));
|
||||
|
||||
@@ -390,11 +390,9 @@ dark_green = (195, 215, 195)
|
||||
light_red = (255, 200, 200)
|
||||
light_green = (200, 255, 200)
|
||||
light_gray = (240, 240, 240)
|
||||
scylla_blue = (87, 209, 229)
|
||||
|
||||
tablet_colors = {
|
||||
(Tablet.STATE_NORMAL, None): GRAY,
|
||||
(Tablet.STATE_NORMAL, 'repair'): scylla_blue,
|
||||
(Tablet.STATE_JOINING, 'allow_write_both_read_old'): dark_green,
|
||||
(Tablet.STATE_LEAVING, 'allow_write_both_read_old'): dark_red,
|
||||
(Tablet.STATE_JOINING, 'write_both_read_old'): dark_green,
|
||||
@@ -534,8 +532,6 @@ def update_from_cql(initial=False):
|
||||
state = (Tablet.STATE_JOINING, tablet.stage)
|
||||
elif replica in leaving:
|
||||
state = (Tablet.STATE_LEAVING, tablet.stage)
|
||||
elif tablet.stage == 'repair':
|
||||
state = (Tablet.STATE_NORMAL, tablet.stage)
|
||||
else:
|
||||
state = (Tablet.STATE_NORMAL, None)
|
||||
|
||||
|
||||
@@ -224,13 +224,7 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
|
||||
ks + " can be granted only SELECT or DESCRIBE permissions to a non-superuser.");
|
||||
}
|
||||
|
||||
static const std::unordered_set<auth::resource> vector_search_system_resources = {
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::GROUP0_HISTORY),
|
||||
auth::make_data_resource(db::system_keyspace::NAME, db::system_keyspace::VERSIONS),
|
||||
};
|
||||
|
||||
if ((cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) ||
|
||||
(cmd.permission == auth::permission::SELECT && vector_search_system_resources.contains(cmd.resource))) {
|
||||
if (cmd.resource.kind() == auth::resource_kind::data && cmd.permission == auth::permission::SELECT && is_vector_indexed.has_value() && is_vector_indexed.value()) {
|
||||
|
||||
co_return co_await ensure_has_permission<auth::command_desc_with_permission_set>({auth::permission_set::of<auth::permission::SELECT, auth::permission::VECTOR_SEARCH_INDEXING>(), cmd.resource});
|
||||
|
||||
|
||||
@@ -6526,19 +6526,14 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
|
||||
leaving.host, pending.host));
|
||||
}
|
||||
|
||||
// All sstables cloned locally will be left unsealed, until they're loaded into the table.
|
||||
// This is to guarantee no unsplit sstables will be left sealed on disk, which could
|
||||
// cause problems if unsplit sstables are found after split was ACKed to coordinator.
|
||||
bool leave_unsealed = true;
|
||||
|
||||
auto d = co_await smp::submit_to(leaving.shard, [this, tablet, leave_unsealed] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
|
||||
auto d = co_await smp::submit_to(leaving.shard, [this, tablet] () -> future<utils::chunked_vector<sstables::entry_descriptor>> {
|
||||
auto& table = _db.local().find_column_family(tablet.table);
|
||||
auto op = table.stream_in_progress();
|
||||
co_return co_await table.clone_tablet_storage(tablet.tablet, leave_unsealed);
|
||||
co_return co_await table.clone_tablet_storage(tablet.tablet);
|
||||
});
|
||||
rtlogger.debug("Cloned storage of tablet {} from leaving replica {}, {} sstables were found", tablet, leaving, d.size());
|
||||
|
||||
auto load_sstable = [leave_unsealed] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
|
||||
auto load_sstable = [] (const dht::sharder& sharder, replica::table& t, sstables::entry_descriptor d) -> future<sstables::shared_sstable> {
|
||||
auto& mng = t.get_sstables_manager();
|
||||
auto sst = mng.make_sstable(t.schema(), t.get_storage_options(), d.generation, d.state.value_or(sstables::sstable_state::normal),
|
||||
d.version, d.format, db_clock::now(), default_io_error_handler_gen());
|
||||
@@ -6546,8 +6541,7 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
|
||||
// will still point to leaving replica at this stage in migration. If node goes down,
|
||||
// SSTables will be loaded at pending replica and migration is retried, so correctness
|
||||
// wise, we're good.
|
||||
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true,
|
||||
.unsealed_sstable = leave_unsealed };
|
||||
auto cfg = sstables::sstable_open_config{ .current_shard_as_sstable_owner = true };
|
||||
co_await sst->load(sharder, cfg);
|
||||
co_return sst;
|
||||
};
|
||||
@@ -6555,23 +6549,16 @@ future<> storage_service::clone_locally_tablet_storage(locator::global_tablet_id
|
||||
co_await smp::submit_to(pending.shard, [this, tablet, load_sstable, d = std::move(d)] () mutable -> future<> {
|
||||
// Loads cloned sstables from leaving replica into pending one.
|
||||
auto& table = _db.local().find_column_family(tablet.table);
|
||||
auto& sstm = table.get_sstables_manager();
|
||||
auto op = table.stream_in_progress();
|
||||
dht::auto_refreshing_sharder sharder(table.shared_from_this());
|
||||
|
||||
std::unordered_set<sstables::shared_sstable> ssts;
|
||||
std::vector<sstables::shared_sstable> ssts;
|
||||
ssts.reserve(d.size());
|
||||
for (auto&& sst_desc : d) {
|
||||
ssts.insert(co_await load_sstable(sharder, table, std::move(sst_desc)));
|
||||
ssts.push_back(co_await load_sstable(sharder, table, std::move(sst_desc)));
|
||||
}
|
||||
auto on_add = [&ssts, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
|
||||
if (ssts.contains(loading_sst)) {
|
||||
auto cfg = sstm.configure_writer(loading_sst->get_origin());
|
||||
co_await loading_sst->seal_sstable(cfg.backup);
|
||||
}
|
||||
co_return;
|
||||
};
|
||||
auto loaded_ssts = co_await table.add_new_sstables_and_update_cache(std::vector(ssts.begin(), ssts.end()), on_add);
|
||||
_view_building_worker.local().load_sstables(tablet.table, loaded_ssts);
|
||||
co_await table.add_sstables_and_update_cache(ssts);
|
||||
_view_building_worker.local().load_sstables(tablet.table, ssts);
|
||||
});
|
||||
rtlogger.debug("Successfully loaded storage of tablet {} into pending replica {}", tablet, pending);
|
||||
}
|
||||
|
||||
@@ -1931,10 +1931,6 @@ public:
|
||||
const auto& table_groups = _tm->tablets().all_table_groups();
|
||||
|
||||
auto finalize_decision = [&] {
|
||||
if (utils::get_local_injector().enter("tablet_resize_finalization_postpone")) {
|
||||
return;
|
||||
}
|
||||
|
||||
_stats.for_cluster().resizes_finalized++;
|
||||
resize_plan.finalize_resize.insert(table);
|
||||
};
|
||||
|
||||
@@ -2623,10 +2623,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
|
||||
co_await _voter_handler.on_node_removed(replaced_node_id, _as);
|
||||
}
|
||||
}
|
||||
utils::get_local_injector().inject("crash_coordinator_before_stream", [] {
|
||||
rtlogger.info("crash_coordinator_before_stream: aborting");
|
||||
abort();
|
||||
});
|
||||
utils::get_local_injector().inject("crash_coordinator_before_stream", [] { abort(); });
|
||||
raft_topology_cmd cmd{raft_topology_cmd::command::stream_ranges};
|
||||
auto state = node.rs->state;
|
||||
try {
|
||||
|
||||
@@ -1696,9 +1696,7 @@ void writer::consume_end_of_stream() {
|
||||
.map = _collector.get_ext_timestamp_stats()
|
||||
});
|
||||
_sst.write_scylla_metadata(_shard, std::move(identifier), std::move(ld_stats), std::move(ts_stats));
|
||||
if (!_cfg.leave_unsealed) {
|
||||
_sst.seal_sstable(_cfg.backup).get();
|
||||
}
|
||||
_sst.seal_sstable(_cfg.backup).get();
|
||||
}
|
||||
|
||||
uint64_t writer::data_file_position_for_tests() const {
|
||||
|
||||
@@ -83,8 +83,6 @@ struct sstable_open_config {
|
||||
bool current_shard_as_sstable_owner = false;
|
||||
// Do not move the sharding metadata to the sharder, keeping it in the scylla metadata..
|
||||
bool keep_sharding_metadata = false;
|
||||
// Allows unsealed sstable to be loaded, since it must read components from temporary TOC instead.
|
||||
bool unsealed_sstable = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -836,14 +836,13 @@ future<std::vector<sstring>> sstable::read_and_parse_toc(file f) {
|
||||
|
||||
// This is small enough, and well-defined. Easier to just read it all
|
||||
// at once
|
||||
future<> sstable::read_toc(sstable_open_config cfg) noexcept {
|
||||
future<> sstable::read_toc() noexcept {
|
||||
if (_recognized_components.size()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
try {
|
||||
auto toc_type = cfg.unsealed_sstable ? component_type::TemporaryTOC : component_type::TOC;
|
||||
co_await do_read_simple(toc_type, [&] (version_types v, file f) -> future<> {
|
||||
co_await do_read_simple(component_type::TOC, [&] (version_types v, file f) -> future<> {
|
||||
auto comps = co_await read_and_parse_toc(f);
|
||||
for (auto& c: comps) {
|
||||
// accept trailing newlines
|
||||
@@ -901,8 +900,8 @@ future<std::unordered_map<component_type, file>> sstable::readable_file_for_all_
|
||||
co_return std::move(files);
|
||||
}
|
||||
|
||||
future<entry_descriptor> sstable::clone(generation_type new_generation, bool leave_unsealed) const {
|
||||
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation, storage::leave_unsealed(leave_unsealed));
|
||||
future<entry_descriptor> sstable::clone(generation_type new_generation) const {
|
||||
co_await _storage->snapshot(*this, _storage->prefix(), storage::absolute_path::yes, new_generation);
|
||||
co_return entry_descriptor(new_generation, _version, _format, component_type::TOC, _state);
|
||||
}
|
||||
|
||||
@@ -1726,7 +1725,7 @@ void sstable::disable_component_memory_reload() {
|
||||
}
|
||||
|
||||
future<> sstable::load_metadata(sstable_open_config cfg) noexcept {
|
||||
co_await read_toc(cfg);
|
||||
co_await read_toc();
|
||||
// read scylla-meta after toc. Might need it to parse
|
||||
// rest (hint extensions)
|
||||
co_await read_scylla_metadata();
|
||||
@@ -3961,13 +3960,11 @@ class sstable_stream_sink_impl : public sstable_stream_sink {
|
||||
shared_sstable _sst;
|
||||
component_type _type;
|
||||
bool _last_component;
|
||||
bool _leave_unsealed;
|
||||
public:
|
||||
sstable_stream_sink_impl(shared_sstable sst, component_type type, sstable_stream_sink_cfg cfg)
|
||||
sstable_stream_sink_impl(shared_sstable sst, component_type type, bool last_component)
|
||||
: _sst(std::move(sst))
|
||||
, _type(type)
|
||||
, _last_component(cfg.last_component)
|
||||
, _leave_unsealed(cfg.leave_unsealed)
|
||||
, _last_component(last_component)
|
||||
{}
|
||||
private:
|
||||
future<> load_metadata() const {
|
||||
@@ -4014,12 +4011,10 @@ public:
|
||||
|
||||
co_return co_await make_file_output_stream(std::move(f), stream_options);
|
||||
}
|
||||
future<shared_sstable> close() override {
|
||||
future<shared_sstable> close_and_seal() override {
|
||||
if (_last_component) {
|
||||
// If we are the last component in a sequence, we can seal the table.
|
||||
if (!_leave_unsealed) {
|
||||
co_await _sst->_storage->seal(*_sst);
|
||||
}
|
||||
co_await _sst->_storage->seal(*_sst);
|
||||
co_return std::move(_sst);
|
||||
}
|
||||
_sst = {};
|
||||
@@ -4036,7 +4031,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, sstable_stream_sink_cfg cfg) {
|
||||
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstables_manager& sstm, const data_dictionary::storage_options& s_opts, sstable_state state, std::string_view component_filename, bool last_component) {
|
||||
auto desc = parse_path(component_filename, schema->ks_name(), schema->cf_name());
|
||||
auto sst = sstm.make_sstable(schema, s_opts, desc.generation, state, desc.version, desc.format);
|
||||
|
||||
@@ -4047,7 +4042,7 @@ std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr schema, sstab
|
||||
type = component_type::TemporaryTOC;
|
||||
}
|
||||
|
||||
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, cfg);
|
||||
return std::make_unique<sstable_stream_sink_impl>(std::move(sst), type, last_component);
|
||||
}
|
||||
|
||||
generation_type
|
||||
|
||||
@@ -109,7 +109,6 @@ struct sstable_writer_config {
|
||||
size_t promoted_index_auto_scale_threshold;
|
||||
uint64_t max_sstable_size = std::numeric_limits<uint64_t>::max();
|
||||
bool backup = false;
|
||||
bool leave_unsealed = false;
|
||||
mutation_fragment_stream_validation_level validation_level;
|
||||
std::optional<db::replay_position> replay_position;
|
||||
std::optional<int> sstable_level;
|
||||
@@ -418,8 +417,8 @@ public:
|
||||
return component_basename(_schema->ks_name(), _schema->cf_name(), _version, _generation, _format, f);
|
||||
}
|
||||
|
||||
component_name get_filename(component_type f = component_type::Data) const {
|
||||
return component_name(*this, f);
|
||||
component_name get_filename() const {
|
||||
return component_name(*this, component_type::Data);
|
||||
}
|
||||
|
||||
component_name toc_filename() const {
|
||||
@@ -694,7 +693,7 @@ private:
|
||||
|
||||
future<> update_info_for_opened_data(sstable_open_config cfg = {});
|
||||
|
||||
future<> read_toc(sstable_open_config cfg = {}) noexcept;
|
||||
future<> read_toc() noexcept;
|
||||
future<> read_summary() noexcept;
|
||||
|
||||
void write_summary() {
|
||||
@@ -1070,9 +1069,8 @@ public:
|
||||
future<std::unordered_map<component_type, file>> readable_file_for_all_components() const;
|
||||
|
||||
// Clones this sstable with a new generation, under the same location as the original one.
|
||||
// If leave_unsealed is true, the destination sstable is left unsealed.
|
||||
// Implementation is underlying storage specific.
|
||||
future<entry_descriptor> clone(generation_type new_generation, bool leave_unsealed = false) const;
|
||||
future<entry_descriptor> clone(generation_type new_generation) const;
|
||||
|
||||
struct lesser_reclaimed_memory {
|
||||
// comparator class to be used by the _reclaimed set in sstables manager
|
||||
@@ -1246,18 +1244,13 @@ public:
|
||||
// closes this component. If this is the last component in a set (see "last_component" in creating method below)
|
||||
// the table on disk will be sealed.
|
||||
// Returns sealed sstable if last, or nullptr otherwise.
|
||||
virtual future<shared_sstable> close() = 0;
|
||||
virtual future<shared_sstable> close_and_seal() = 0;
|
||||
virtual future<> abort() = 0;
|
||||
};
|
||||
|
||||
struct sstable_stream_sink_cfg {
|
||||
bool last_component = false;
|
||||
bool leave_unsealed = false;
|
||||
};
|
||||
|
||||
// Creates a sink object which can receive a component file sourced from above source object data.
|
||||
|
||||
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, sstable_stream_sink_cfg cfg);
|
||||
std::unique_ptr<sstable_stream_sink> create_stream_sink(schema_ptr, sstables_manager&, const data_dictionary::storage_options&, sstable_state, std::string_view component_filename, bool last_component);
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
|
||||
@@ -50,14 +50,7 @@ class filesystem_storage final : public sstables::storage {
|
||||
std::optional<std::filesystem::path> _temp_dir; // Valid while the sstable is being created, until sealed
|
||||
|
||||
private:
|
||||
struct mark_for_removal_tag {};
|
||||
struct leave_unsealed_tag {};
|
||||
|
||||
enum class link_mode {
|
||||
default_mode,
|
||||
mark_for_removal,
|
||||
leave_unsealed,
|
||||
};
|
||||
using mark_for_removal = bool_class<class mark_for_removal_tag>;
|
||||
|
||||
template <typename Comp>
|
||||
requires std::is_same_v<Comp, component_type> || std::is_same_v<Comp, sstring>
|
||||
@@ -68,9 +61,7 @@ private:
|
||||
future<> check_create_links_replay(const sstable& sst, const sstring& dst_dir, generation_type dst_gen, const std::vector<std::pair<sstables::component_type, sstring>>& comps) const;
|
||||
future<> remove_temp_dir();
|
||||
virtual future<> create_links(const sstable& sst, const std::filesystem::path& dir) const override;
|
||||
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, link_mode mode) const;
|
||||
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const;
|
||||
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const;
|
||||
future<> create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal mark_for_removal) const;
|
||||
future<> create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> dst_gen) const;
|
||||
future<> touch_temp_dir(const sstable& sst);
|
||||
future<> move(const sstable& sst, sstring new_dir, generation_type generation, delayed_commit_changes* delay) override;
|
||||
@@ -92,7 +83,7 @@ public:
|
||||
{}
|
||||
|
||||
virtual future<> seal(const sstable& sst) override;
|
||||
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const override;
|
||||
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const override;
|
||||
virtual future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
|
||||
// runs in async context
|
||||
virtual void open(sstable& sst) override;
|
||||
@@ -365,13 +356,8 @@ future<> filesystem_storage::check_create_links_replay(const sstable& sst, const
|
||||
/// \param sst - the sstable to work on
|
||||
/// \param dst_dir - the destination directory.
|
||||
/// \param generation - the generation of the destination sstable
|
||||
/// \param mode - what will be done after all components were linked
|
||||
/// mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
|
||||
/// leave_unsealed - leaves the destination sstable unsealed
|
||||
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, link_mode mode) const {
|
||||
// They're mutually exclusive, so we can assume only one is set.
|
||||
bool mark_for_removal = mode == link_mode::mark_for_removal;
|
||||
bool leave_unsealed = mode == link_mode::leave_unsealed;
|
||||
/// \param mark_for_removal - mark the sstable for removal after linking it to the destination dst_dir
|
||||
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type generation, mark_for_removal mark_for_removal) const {
|
||||
sstlog.trace("create_links: {} -> {} generation={} mark_for_removal={}", sst.get_filename(), dst_dir, generation, mark_for_removal);
|
||||
auto comps = sst.all_components();
|
||||
co_await check_create_links_replay(sst, dst_dir, generation, comps);
|
||||
@@ -380,11 +366,7 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
|
||||
co_await sst.sstable_write_io_check(idempotent_link_file, fmt::to_string(sst.filename(component_type::TOC)), std::move(dst));
|
||||
auto dir = opened_directory(dst_dir);
|
||||
co_await dir.sync(sst._write_error_handler);
|
||||
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation, leave_unsealed] (auto p) {
|
||||
// Skips the linking of TOC file if the destination will be left unsealed.
|
||||
if (leave_unsealed && p.first == component_type::TOC) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
co_await parallel_for_each(comps, [this, &sst, &dst_dir, generation] (auto p) {
|
||||
auto src = filename(sst, _dir.native(), sst._generation, p.second);
|
||||
auto dst = filename(sst, dst_dir, generation, p.second);
|
||||
return sst.sstable_write_io_check(idempotent_link_file, std::move(src), std::move(dst));
|
||||
@@ -397,10 +379,9 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
|
||||
auto src_temp_toc = filename(sst, _dir.native(), sst._generation, component_type::TemporaryTOC);
|
||||
co_await sst.sstable_write_io_check(rename_file, std::move(dst_temp_toc), std::move(src_temp_toc));
|
||||
co_await _dir.sync(sst._write_error_handler);
|
||||
} else if (!leave_unsealed) {
|
||||
} else {
|
||||
// Now that the source sstable is linked to dir, remove
|
||||
// the TemporaryTOC file at the destination.
|
||||
// This is bypassed if destination will be left unsealed.
|
||||
co_await sst.sstable_write_io_check(remove_file, std::move(dst_temp_toc));
|
||||
}
|
||||
co_await dir.sync(sst._write_error_handler);
|
||||
@@ -408,23 +389,15 @@ future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst
|
||||
sstlog.trace("create_links: {} -> {} generation={}: done", sst.get_filename(), dst_dir, generation);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::create_links_common(const sstable& sst, sstring dst_dir, generation_type dst_gen, mark_for_removal_tag) const {
|
||||
return create_links_common(sst, dst_dir, dst_gen, link_mode::mark_for_removal);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen, leave_unsealed_tag) const {
|
||||
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::leave_unsealed);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::create_links_common(const sstable& sst, const std::filesystem::path& dir, std::optional<generation_type> gen) const {
|
||||
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), link_mode::default_mode);
|
||||
return create_links_common(sst, dir.native(), gen.value_or(sst._generation), mark_for_removal::no);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::create_links(const sstable& sst, const std::filesystem::path& dir) const {
|
||||
return create_links_common(sst, dir.native(), sst._generation, link_mode::default_mode);
|
||||
return create_links_common(sst, dir.native(), sst._generation, mark_for_removal::no);
|
||||
}
|
||||
|
||||
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed leave_unsealed) const {
|
||||
future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
|
||||
std::filesystem::path snapshot_dir;
|
||||
if (abs) {
|
||||
snapshot_dir = dir;
|
||||
@@ -432,11 +405,7 @@ future<> filesystem_storage::snapshot(const sstable& sst, sstring dir, absolute_
|
||||
snapshot_dir = _dir.path() / dir;
|
||||
}
|
||||
co_await sst.sstable_touch_directory_io_check(snapshot_dir);
|
||||
if (leave_unsealed) {
|
||||
co_await create_links_common(sst, snapshot_dir, std::move(gen), leave_unsealed_tag{});
|
||||
} else {
|
||||
co_await create_links_common(sst, snapshot_dir, std::move(gen));
|
||||
}
|
||||
co_await create_links_common(sst, snapshot_dir, std::move(gen));
|
||||
}
|
||||
|
||||
future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generation_type new_generation, delayed_commit_changes* delay_commit) {
|
||||
@@ -444,7 +413,7 @@ future<> filesystem_storage::move(const sstable& sst, sstring new_dir, generatio
|
||||
sstring old_dir = _dir.native();
|
||||
sstlog.debug("Moving {} old_generation={} to {} new_generation={} do_sync_dirs={}",
|
||||
sst.get_filename(), sst._generation, new_dir, new_generation, delay_commit == nullptr);
|
||||
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal_tag{});
|
||||
co_await create_links_common(sst, new_dir, new_generation, mark_for_removal::yes);
|
||||
co_await change_dir(new_dir);
|
||||
generation_type old_generation = sst._generation;
|
||||
co_await coroutine::parallel_for_each(sst.all_components(), [&sst, old_generation, old_dir] (auto p) {
|
||||
@@ -629,7 +598,7 @@ public:
|
||||
{}
|
||||
|
||||
future<> seal(const sstable& sst) override;
|
||||
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>, storage::leave_unsealed) const override;
|
||||
future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type>) const override;
|
||||
future<> change_state(const sstable& sst, sstable_state state, generation_type generation, delayed_commit_changes* delay) override;
|
||||
// runs in async context
|
||||
void open(sstable& sst) override;
|
||||
@@ -846,7 +815,7 @@ future<> object_storage_base::unlink_component(const sstable& sst, component_typ
|
||||
}
|
||||
}
|
||||
|
||||
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen, storage::leave_unsealed) const {
|
||||
future<> object_storage_base::snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen) const {
|
||||
on_internal_error(sstlog, "Snapshotting S3 objects not implemented");
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -97,10 +97,9 @@ public:
|
||||
|
||||
using absolute_path = bool_class<class absolute_path_tag>; // FIXME -- should go away eventually
|
||||
using sync_dir = bool_class<struct sync_dir_tag>; // meaningful only to filesystem storage
|
||||
using leave_unsealed = bool_class<struct leave_unsealed_tag>;
|
||||
|
||||
virtual future<> seal(const sstable& sst) = 0;
|
||||
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}, leave_unsealed lu = leave_unsealed::no) const = 0;
|
||||
virtual future<> snapshot(const sstable& sst, sstring dir, absolute_path abs, std::optional<generation_type> gen = {}) const = 0;
|
||||
virtual future<> change_state(const sstable& sst, sstable_state to, generation_type generation, delayed_commit_changes* delay) = 0;
|
||||
// runs in async context
|
||||
virtual void open(sstable& sst) = 0;
|
||||
|
||||
@@ -63,45 +63,30 @@ mutation_reader_consumer make_streaming_consumer(sstring origin,
|
||||
}
|
||||
schema_ptr s = reader.schema();
|
||||
|
||||
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
|
||||
// left sealed on the table directory.
|
||||
auto cfg = cf->get_sstables_manager().configure_writer(origin);
|
||||
cfg.leave_unsealed = true;
|
||||
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
|
||||
cfg, encoding_stats{}).then([sst] {
|
||||
return sst->open_data();
|
||||
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] -> future<std::vector<sstables::shared_sstable>> {
|
||||
auto on_add = [sst, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard, cfg] (sstables::shared_sstable loading_sst) -> future<> {
|
||||
if (repaired_at && sstables::repair_origin == origin) {
|
||||
loading_sst->being_repaired = frozen_guard;
|
||||
if (sstable_list_to_mark_as_repaired) {
|
||||
sstable_list_to_mark_as_repaired->insert(loading_sst);
|
||||
}
|
||||
}).then([cf, sst, offstrategy, origin, repaired_at, sstable_list_to_mark_as_repaired, frozen_guard] -> future<> {
|
||||
if (repaired_at && sstables::repair_origin == origin) {
|
||||
sst->being_repaired = frozen_guard;
|
||||
if (sstable_list_to_mark_as_repaired) {
|
||||
sstable_list_to_mark_as_repaired->insert(sst);
|
||||
}
|
||||
if (loading_sst == sst) {
|
||||
co_await loading_sst->seal_sstable(cfg.backup);
|
||||
}
|
||||
co_return;
|
||||
};
|
||||
}
|
||||
if (offstrategy && sstables::repair_origin == origin) {
|
||||
sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}",
|
||||
cf->schema()->ks_name(), cf->schema()->cf_name());
|
||||
cf->enable_off_strategy_trigger();
|
||||
}
|
||||
co_return co_await cf->add_new_sstable_and_update_cache(sst, on_add, offstrategy);
|
||||
}).then([cf, s, sst, use_view_update_path, &vb, &vbw] (std::vector<sstables::shared_sstable> new_sstables) mutable -> future<> {
|
||||
auto& vb_ = vb;
|
||||
auto new_sstables_ = std::move(new_sstables);
|
||||
auto table = cf;
|
||||
|
||||
co_await cf->add_sstable_and_update_cache(sst, offstrategy);
|
||||
}).then([cf, s, sst, use_view_update_path, &vb, &vbw]() mutable -> future<> {
|
||||
if (use_view_update_path == db::view::sstable_destination_decision::staging_managed_by_vbc) {
|
||||
co_return co_await vbw.local().register_staging_sstable_tasks(new_sstables_, cf->schema()->id());
|
||||
return vbw.local().register_staging_sstable_tasks({sst}, cf->schema()->id());
|
||||
} else if (use_view_update_path == db::view::sstable_destination_decision::staging_directly_to_generator) {
|
||||
co_await coroutine::parallel_for_each(new_sstables_, [&vb_, &table] (sstables::shared_sstable sst) -> future<> {
|
||||
return vb_.local().register_staging_sstable(sst, table);
|
||||
});
|
||||
return vb.local().register_staging_sstable(sst, std::move(cf));
|
||||
}
|
||||
co_return;
|
||||
return make_ready_future<>();
|
||||
});
|
||||
};
|
||||
if (!offstrategy) {
|
||||
|
||||
@@ -52,16 +52,8 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
|
||||
auto erm = t.get_effective_replication_map();
|
||||
auto& sstm = t.get_sstables_manager();
|
||||
auto sst = sstm.make_sstable(t.schema(), t.get_storage_options(), desc.generation, state, desc.version, desc.format);
|
||||
sstables::sstable_open_config cfg { .unsealed_sstable = true };
|
||||
co_await sst->load(erm->get_sharder(*t.schema()), cfg);
|
||||
auto on_add = [sst, &sstm] (sstables::shared_sstable loading_sst) -> future<> {
|
||||
if (loading_sst == sst) {
|
||||
auto cfg = sstm.configure_writer(sst->get_origin());
|
||||
co_await loading_sst->seal_sstable(cfg.backup);
|
||||
}
|
||||
co_return;
|
||||
};
|
||||
auto new_sstables = co_await t.add_new_sstable_and_update_cache(sst, on_add);
|
||||
co_await sst->load(erm->get_sharder(*t.schema()));
|
||||
co_await t.add_sstable_and_update_cache(sst);
|
||||
blogger.info("stream_sstables[{}] Loaded sstable {} successfully", ops_id, sst->toc_filename());
|
||||
|
||||
if (state == sstables::sstable_state::staging) {
|
||||
@@ -72,7 +64,7 @@ static future<> load_sstable_for_tablet(const file_stream_id& ops_id, replica::d
|
||||
// so then, the view building coordinator can decide to process it once the migration
|
||||
// is finished.
|
||||
// (Instead of registering the sstable to view update generator which may process it immediately.)
|
||||
co_await sharded_vbw.local().register_staging_sstable_tasks(new_sstables, t.schema()->id());
|
||||
co_await sharded_vbw.local().register_staging_sstable_tasks({sst}, t.schema()->id());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -351,11 +343,7 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
|
||||
|
||||
auto& table = db.find_column_family(meta.table);
|
||||
auto& sstm = table.get_sstables_manager();
|
||||
// SSTable will be only sealed when added to the sstable set, so we make sure unsplit sstables aren't
|
||||
// left sealed on the table directory.
|
||||
sstables::sstable_stream_sink_cfg cfg { .last_component = meta.fops == file_ops::load_sstables,
|
||||
.leave_unsealed = true };
|
||||
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, cfg);
|
||||
auto sstable_sink = sstables::create_stream_sink(table.schema(), sstm, table.get_storage_options(), sstable_state(meta), meta.filename, meta.fops == file_ops::load_sstables);
|
||||
auto out = co_await sstable_sink->output(foptions, stream_options);
|
||||
co_return output_result{
|
||||
[sstable_sink = std::move(sstable_sink), &meta, &db, &vbw](store_result res) -> future<> {
|
||||
@@ -363,7 +351,7 @@ future<> stream_blob_handler(replica::database& db, db::view::view_building_work
|
||||
co_await sstable_sink->abort();
|
||||
co_return;
|
||||
}
|
||||
auto sst = co_await sstable_sink->close();
|
||||
auto sst = co_await sstable_sink->close_and_seal();
|
||||
if (sst) {
|
||||
blogger.debug("stream_sstables[{}] Loading sstable {} on shard {}", meta.ops_id, sst->toc_filename(), meta.dst_shard_id);
|
||||
auto desc = sst->get_descriptor(sstables::component_type::TOC);
|
||||
|
||||
@@ -110,7 +110,7 @@ public:
|
||||
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
|
||||
virtual reader_permit make_compaction_reader_permit() const override { return _semaphore.make_permit(); }
|
||||
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
|
||||
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return _sstable_factory(); }
|
||||
virtual sstables::shared_sstable make_sstable() const override { return _sstable_factory(); }
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return _sst_man.configure_writer(std::move(origin)); }
|
||||
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
|
||||
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
|
||||
|
||||
@@ -387,27 +387,4 @@ SEASTAR_TEST_CASE(select_from_vector_indexed_table) {
|
||||
enable_tablets(db_config_with_auth()));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(select_from_vector_search_system_table) {
|
||||
return do_with_cql_env_thread(
|
||||
[](auto&& env) {
|
||||
create_user_if_not_exists(env, bob);
|
||||
with_user(env, bob, [&env] {
|
||||
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.group0_history").get(), exceptions::unauthorized_exception,
|
||||
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
|
||||
});
|
||||
with_user(env, bob, [&env] {
|
||||
BOOST_REQUIRE_EXCEPTION(env.execute_cql("SELECT * FROM system.versions").get(), exceptions::unauthorized_exception,
|
||||
exception_predicate::message_contains("User bob has none of the permissions (VECTOR_SEARCH_INDEXING, SELECT) on"));
|
||||
});
|
||||
cquery_nofail(env, "GRANT VECTOR_SEARCH_INDEXING ON ALL KEYSPACES TO bob");
|
||||
with_user(env, bob, [&env] {
|
||||
cquery_nofail(env, "SELECT * FROM system.group0_history");
|
||||
});
|
||||
with_user(env, bob, [&env] {
|
||||
cquery_nofail(env, "SELECT * FROM system.versions");
|
||||
});
|
||||
},
|
||||
db_config_with_auth());
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -6275,11 +6275,11 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
|
||||
|
||||
auto& cm = t->get_compaction_manager();
|
||||
auto split_opt = compaction::compaction_type_options::split{classify_fn};
|
||||
auto new_ssts = cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), split_opt).get();
|
||||
auto new_ssts = cm.maybe_split_sstable(input, t.as_compaction_group_view(), split_opt).get();
|
||||
BOOST_REQUIRE(new_ssts.size() == expected_output_size);
|
||||
for (auto& sst : new_ssts) {
|
||||
// split sstables don't require further split.
|
||||
auto ssts = cm.maybe_split_new_sstable(sst, t.as_compaction_group_view(), split_opt).get();
|
||||
auto ssts = cm.maybe_split_sstable(sst, t.as_compaction_group_view(), split_opt).get();
|
||||
BOOST_REQUIRE(ssts.size() == 1);
|
||||
BOOST_REQUIRE(ssts.front() == sst);
|
||||
}
|
||||
@@ -6291,97 +6291,9 @@ SEASTAR_TEST_CASE(splitting_compaction_test) {
|
||||
}
|
||||
return classify_fn(t);
|
||||
};
|
||||
BOOST_REQUIRE_THROW(cm.maybe_split_new_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
|
||||
BOOST_REQUIRE_THROW(cm.maybe_split_sstable(input, t.as_compaction_group_view(), compaction::compaction_type_options::split{throwing_classifier}).get(),
|
||||
std::runtime_error);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(unsealed_sstable_compaction_test) {
|
||||
BOOST_REQUIRE(smp::count == 1);
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
auto s = schema_builder("tests", "unsealed_sstable_compaction_test")
|
||||
.with_column("id", utf8_type, column_kind::partition_key)
|
||||
.with_column("value", int32_type).build();
|
||||
|
||||
auto t = env.make_table_for_tests(s);
|
||||
auto close_t = deferred_stop(t);
|
||||
t->start();
|
||||
|
||||
mutation mut(s, partition_key::from_exploded(*s, {to_bytes("alpha")}));
|
||||
mut.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), 0);
|
||||
|
||||
sstable_writer_config sst_cfg = env.manager().configure_writer();
|
||||
sst_cfg.leave_unsealed = true;
|
||||
auto unsealed_sstable = make_sstable_easy(env, make_mutation_reader_from_mutations(s, env.make_reader_permit(), std::move(mut)), sst_cfg);
|
||||
|
||||
BOOST_REQUIRE(file_exists(unsealed_sstable->get_filename(sstables::component_type::TemporaryTOC).format()).get());
|
||||
|
||||
auto sst_gen = env.make_sst_factory(s);
|
||||
auto info = compact_sstables(env, compaction::compaction_descriptor({ unsealed_sstable }), t, sst_gen).get();
|
||||
BOOST_REQUIRE(info.new_sstables.size() == 1);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(sstable_clone_leaving_unsealed_dest_sstable) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto pk = ss.make_pkey();
|
||||
|
||||
auto mut1 = mutation(s, pk);
|
||||
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
|
||||
auto sst = make_sstable_containing(env.make_sstable(s), {std::move(mut1)});
|
||||
|
||||
auto table = env.make_table_for_tests(s);
|
||||
auto close_table = deferred_stop(table);
|
||||
|
||||
sstables::sstable_generation_generator gen_generator;
|
||||
|
||||
bool leave_unsealed = true;
|
||||
auto d = sst->clone(gen_generator(), leave_unsealed).get();
|
||||
|
||||
auto sst2 = env.make_sstable(s, d.generation, d.version, d.format);
|
||||
sst2->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
|
||||
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::TOC).format()).get());
|
||||
BOOST_REQUIRE(file_exists(sst2->get_filename(sstables::component_type::TemporaryTOC).format()).get());
|
||||
|
||||
leave_unsealed = false;
|
||||
d = sst->clone(gen_generator(), leave_unsealed).get();
|
||||
|
||||
auto sst3 = env.make_sstable(s, d.generation, d.version, d.format);
|
||||
sst3->load(s->get_sharder(), sstable_open_config{ .unsealed_sstable = leave_unsealed }).get();
|
||||
BOOST_REQUIRE(file_exists(sst3->get_filename(sstables::component_type::TOC).format()).get());
|
||||
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::TemporaryTOC).format()).get());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(failure_when_adding_new_sstable_test) {
|
||||
return test_env::do_with_async([] (test_env& env) {
|
||||
simple_schema ss;
|
||||
auto s = ss.schema();
|
||||
auto pk = ss.make_pkey();
|
||||
|
||||
auto mut1 = mutation(s, pk);
|
||||
mut1.partition().apply_insert(*s, ss.make_ckey(0), ss.new_timestamp());
|
||||
auto sst = make_sstable_containing(env.make_sstable(s), {mut1});
|
||||
|
||||
auto table = env.make_table_for_tests(s);
|
||||
auto close_table = deferred_stop(table);
|
||||
|
||||
auto on_add = [] (sstables::shared_sstable) { throw std::runtime_error("fail to seal"); return make_ready_future<>(); };
|
||||
BOOST_REQUIRE_THROW(table->add_new_sstable_and_update_cache(sst, on_add).get(), std::runtime_error);
|
||||
|
||||
// Verify new sstable was unlinked on failure.
|
||||
BOOST_REQUIRE(!file_exists(sst->get_filename(sstables::component_type::Data).format()).get());
|
||||
|
||||
auto sst2 = make_sstable_containing(env.make_sstable(s), {mut1});
|
||||
auto sst3 = make_sstable_containing(env.make_sstable(s), {mut1});
|
||||
BOOST_REQUIRE_THROW(table->add_new_sstables_and_update_cache({sst2, sst3}, on_add).get(), std::runtime_error);
|
||||
|
||||
// Verify both sstables are unlinked on failure.
|
||||
BOOST_REQUIRE(!file_exists(sst2->get_filename(sstables::component_type::Data).format()).get());
|
||||
BOOST_REQUIRE(!file_exists(sst3->get_filename(sstables::component_type::Data).format()).get());
|
||||
});
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_SUITE_END()
|
||||
|
||||
@@ -17,7 +17,6 @@ from concurrent.futures.thread import ThreadPoolExecutor
|
||||
from multiprocessing import Event
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from test import TOP_SRC_DIR, path_to
|
||||
from test.pylib.runner import testpy_test_fixture_scope
|
||||
from test.pylib.random_tables import RandomTables
|
||||
from test.pylib.util import unique_name
|
||||
@@ -59,20 +58,6 @@ logger = logging.getLogger(__name__)
|
||||
print(f"Driver name {DRIVER_NAME}, version {DRIVER_VERSION}")
|
||||
|
||||
|
||||
async def decode_backtrace(build_mode: str, input: str):
|
||||
executable = Path(path_to(build_mode, "scylla"))
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
(TOP_SRC_DIR / "seastar" / "scripts" / "seastar-addr2line").absolute(),
|
||||
"-e",
|
||||
executable.absolute(),
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, stderr = await proc.communicate(input=input.encode())
|
||||
return f"{stdout.decode()}\n{stderr.decode()}"
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption('--manager-api', action='store',
|
||||
help='Manager unix socket path')
|
||||
@@ -258,11 +243,6 @@ async def manager(request: pytest.FixtureRequest,
|
||||
# test failure.
|
||||
report = request.node.stash[PHASE_REPORT_KEY]
|
||||
failed = report.when == "call" and report.failed
|
||||
|
||||
# Check if the test has the check_nodes_for_errors marker
|
||||
found_errors = await manager_client.check_all_errors(check_all_errors=(request.node.get_closest_marker("check_nodes_for_errors") is not None))
|
||||
failed = failed or found_errors
|
||||
|
||||
if failed:
|
||||
# Save scylladb logs for failed tests in a separate directory and copy XML report to the same directory to have
|
||||
# all related logs in one dir.
|
||||
@@ -286,44 +266,10 @@ async def manager(request: pytest.FixtureRequest,
|
||||
await manager_client.stop() # Stop client session and close driver after each test
|
||||
if cluster_status["server_broken"]:
|
||||
pytest.fail(
|
||||
f"test case {test_case_name} left unfinished tasks on Scylla server. Server marked as broken,"
|
||||
f"test case {test_case_name} leave unfinished tasks on Scylla server. Server marked as broken,"
|
||||
f" server_broken_reason: {cluster_status["message"]}"
|
||||
)
|
||||
if found_errors:
|
||||
full_message = []
|
||||
for server, data in found_errors.items():
|
||||
summary = []
|
||||
detailed = []
|
||||
|
||||
if criticals := data.get("critical", []):
|
||||
summary.append(f"{len(criticals)} critical error(s)")
|
||||
detailed.extend(map(str.rstrip, criticals))
|
||||
|
||||
if backtraces := data.get("backtraces", []):
|
||||
summary.append(f"{len(backtraces)} backtrace(s)")
|
||||
with open(failed_test_dir_path / f"scylla-{server.server_id}-backtraces.txt", "w") as bt_file:
|
||||
for backtrace in backtraces:
|
||||
bt_file.write(backtrace + "\n\n")
|
||||
decoded_bt = await decode_backtrace(build_mode, backtrace)
|
||||
bt_file.write(decoded_bt + "\n\n")
|
||||
detailed.append(f"{len(backtraces)} backtrace(s) saved in {Path(bt_file.name).name}")
|
||||
|
||||
if errors := data.get("error", []):
|
||||
summary.append(f"{len(errors)} error(s)")
|
||||
detailed.extend(map(str.rstrip, errors))
|
||||
|
||||
if cores := data.get("cores", []):
|
||||
summary.append(f"{len(cores)} core(s): {', '.join(cores)}")
|
||||
|
||||
if summary:
|
||||
summary_line = f"Server {server.server_id}: found {', '.join(summary)} (log: { data['log']})"
|
||||
detailed = [f" {line}" for line in detailed]
|
||||
full_message.append(summary_line)
|
||||
full_message.extend(detailed)
|
||||
|
||||
with open(failed_test_dir_path / "found_errors.txt", "w") as f:
|
||||
f.write("\n".join(full_message))
|
||||
pytest.fail(f"\n{'\n'.join(full_message)}")
|
||||
|
||||
# "cql" fixture: set up client object for communicating with the CQL API.
|
||||
# Since connection is managed by manager just return that object
|
||||
|
||||
@@ -110,9 +110,6 @@ def fixture_dtest_setup(request: FixtureRequest,
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.error("Error stopping cluster: %s", str(e))
|
||||
|
||||
manager.ignore_log_patterns.extend(dtest_setup.ignore_log_patterns)
|
||||
manager.ignore_cores_log_patterns.extend(dtest_setup.ignore_cores_log_patterns)
|
||||
|
||||
try:
|
||||
if not dtest_setup.allow_log_errors:
|
||||
exclude_errors = []
|
||||
|
||||
@@ -14,7 +14,7 @@ import time
|
||||
from collections import defaultdict
|
||||
from functools import cached_property
|
||||
from functools import wraps
|
||||
from typing import List, Dict, Callable, Optional, Tuple
|
||||
from typing import List, Dict, Callable
|
||||
|
||||
from cassandra import ConsistencyLevel
|
||||
from cassandra import WriteTimeout, ReadTimeout, OperationTimedOut
|
||||
@@ -64,7 +64,6 @@ class Worker:
|
||||
"""
|
||||
A single worker increments its dedicated column `s{i}` via LWT:
|
||||
UPDATE .. SET s{i}=? WHERE pk=? IF <guards on other cols> AND s{i}=?
|
||||
bump global phase-ops counter via on_applied()
|
||||
It checks for applied state and retries on "uncertainty" timeouts.
|
||||
"""
|
||||
def __init__(
|
||||
@@ -77,10 +76,8 @@ class Worker:
|
||||
other_columns: List[int],
|
||||
get_lower_bound: Callable[[int, int], int],
|
||||
on_applied: Callable[[int, int, int], None],
|
||||
stop_event: asyncio.Event,
|
||||
counter_update_statement: Optional[PreparedStatement] = None,
|
||||
counters_random_delta: bool = False,
|
||||
counters_max_delta: int = 5,
|
||||
stop_event: asyncio.Event
|
||||
|
||||
):
|
||||
self.stop_event = stop_event
|
||||
self.success_counts: Dict[int, int] = {pk: 0 for pk in pks}
|
||||
@@ -94,11 +91,7 @@ class Worker:
|
||||
self.cql = cql
|
||||
self.get_lower_bound = get_lower_bound
|
||||
self.on_applied = on_applied
|
||||
# counters
|
||||
self.counter_update_statement = counter_update_statement
|
||||
self.counters_random_delta = counters_random_delta
|
||||
self.counters_max_delta = max(1, counters_max_delta)
|
||||
self.counter_deltas: Dict[int, int] = {pk: 0 for pk in pks}
|
||||
|
||||
|
||||
async def verify_update_through_select(self, pk, new_val, prev_val):
|
||||
"""
|
||||
@@ -113,24 +106,6 @@ class Worker:
|
||||
assert current_val == new_val or current_val == prev_val
|
||||
return current_val == new_val
|
||||
|
||||
def _next_counter_delta(self) -> int:
|
||||
"""
|
||||
Compute the next delta to apply to the counter table.
|
||||
If random mode is disabled -> always +1.
|
||||
If random mode is enabled -> random value from
|
||||
[-max_delta..-1] U [1..max_delta].
|
||||
"""
|
||||
if not self.counters_random_delta:
|
||||
return 1
|
||||
# Avoid 0 by choosing magnitude in [1, max] and random sign.
|
||||
mag = self.rng.randint(1, self.counters_max_delta)
|
||||
sign = -1 if self.rng.random() < 0.5 else 1
|
||||
return sign * mag
|
||||
|
||||
async def _inc_counter(self, pk: int, delta: int) -> None:
|
||||
stmt = self.counter_update_statement.bind([delta, pk])
|
||||
stmt.consistency_level = ConsistencyLevel.LOCAL_QUORUM
|
||||
await self.cql.run_async(stmt)
|
||||
|
||||
def stop(self) -> None:
|
||||
self.stop_event.set()
|
||||
@@ -195,11 +170,6 @@ class Worker:
|
||||
self.on_applied(pk, self.worker_id, new_val)
|
||||
self.success_counts[pk] += 1
|
||||
|
||||
if self.counter_update_statement:
|
||||
delta = self._next_counter_delta()
|
||||
self.counter_deltas[pk] += delta
|
||||
await self._inc_counter(pk, delta)
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
except Exception:
|
||||
@@ -217,8 +187,7 @@ class BaseLWTTester:
|
||||
|
||||
def __init__(
|
||||
self, manager: ManagerClient, ks: str, tbl: str,
|
||||
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS, use_counters: bool = False,
|
||||
counters_random_delta: bool = False, counters_max_delta: int = 5, counter_tbl: Optional[str] = None
|
||||
num_workers: int = DEFAULT_WORKERS, num_keys: int = DEFAULT_NUM_KEYS
|
||||
):
|
||||
self.ks = ks
|
||||
self.tbl = tbl
|
||||
@@ -233,12 +202,6 @@ class BaseLWTTester:
|
||||
self.migrations = 0
|
||||
self.phase = "warmup" # "warmup" -> "migrating" -> "post"
|
||||
self.phase_ops = defaultdict(int)
|
||||
# counters config
|
||||
self.use_counters = use_counters
|
||||
self.counters_random_delta = counters_random_delta
|
||||
self.counters_max_delta = counters_max_delta
|
||||
self.counter_tbl = counter_tbl or (f"{tbl}_ctr" if use_counters else None)
|
||||
|
||||
|
||||
def _get_lower_bound(self, pk: int, col_idx: int) -> int:
|
||||
return self.lb_counts[pk][col_idx]
|
||||
@@ -270,14 +233,6 @@ class BaseLWTTester:
|
||||
|
||||
def create_workers(self, stop_event) -> List[Worker]:
|
||||
workers: List[Worker] = []
|
||||
|
||||
counter_stmt: Optional[PreparedStatement] = None
|
||||
if self.use_counters:
|
||||
counter_stmt = self.cql.prepare(
|
||||
f"UPDATE {self.ks}.{self.counter_tbl} "
|
||||
f"SET c = c + ? WHERE pk = ?"
|
||||
)
|
||||
|
||||
for i in range(self.num_workers):
|
||||
other_columns = [j for j in range(self.num_workers) if j != i]
|
||||
cond = " AND ".join([*(f"s{j} >= ?" for j in other_columns), f"s{i} = ?"])
|
||||
@@ -292,9 +247,6 @@ class BaseLWTTester:
|
||||
other_columns=other_columns,
|
||||
get_lower_bound=self._get_lower_bound,
|
||||
on_applied=self._on_applied,
|
||||
counter_update_statement=counter_stmt,
|
||||
counters_random_delta=self.counters_random_delta,
|
||||
counters_max_delta=self.counters_max_delta,
|
||||
)
|
||||
workers.append(worker)
|
||||
return workers
|
||||
@@ -306,11 +258,6 @@ class BaseLWTTester:
|
||||
f"CREATE TABLE {self.ks}.{self.tbl} (pk int PRIMARY KEY, {cols_def})"
|
||||
)
|
||||
logger.info("Created table %s.%s with %d columns", self.ks, self.tbl, self.num_workers)
|
||||
if self.use_counters:
|
||||
await self.cql.run_async(
|
||||
f"CREATE TABLE {self.ks}.{self.counter_tbl} (pk int PRIMARY KEY, c counter)"
|
||||
)
|
||||
logger.info("Created counter table %s.%s", self.ks, self.counter_tbl)
|
||||
|
||||
async def initialize_rows(self):
|
||||
"""
|
||||
@@ -349,7 +296,7 @@ class BaseLWTTester:
|
||||
assert not errs, f"worker errors: {errs}"
|
||||
logger.info("All workers stopped")
|
||||
|
||||
async def _verify_base_table(self):
|
||||
async def verify_consistency(self):
|
||||
"""Ensure every (pk, column) reflects the number of successful CAS writes."""
|
||||
# Run SELECTs for all PKs in parallel using prepared statement
|
||||
tasks = []
|
||||
@@ -373,35 +320,6 @@ class BaseLWTTester:
|
||||
total_ops = sum(sum(w.success_counts.values()) for w in self.workers)
|
||||
logger.info("Consistency verified – %d total successful CAS operations", total_ops)
|
||||
|
||||
async def _verify_counters(self):
|
||||
if not self.use_counters:
|
||||
return
|
||||
|
||||
stmt = SimpleStatement(
|
||||
f"SELECT pk, c FROM {self.ks}.{self.counter_tbl}",
|
||||
consistency_level=ConsistencyLevel.LOCAL_QUORUM,
|
||||
)
|
||||
|
||||
rows = await self.cql.run_async(stmt)
|
||||
db_values: Dict[int, int] = {row.pk: row.c for row in rows}
|
||||
|
||||
mismatches = []
|
||||
for pk in self.pks:
|
||||
actual = db_values.get(pk, 0)
|
||||
expected = sum(worker.counter_deltas.get(pk, 0) for worker in self.workers)
|
||||
if actual != expected:
|
||||
mismatches.append(
|
||||
f"counter mismatch pk={pk} c={actual}, expected={expected}"
|
||||
)
|
||||
|
||||
assert not mismatches, "Counter consistency violations: " + "; ".join(mismatches)
|
||||
total_delta = sum(sum(worker.counter_deltas.values()) for worker in self.workers)
|
||||
logger.info("Counter table consistency verified – total delta=%d", total_delta)
|
||||
|
||||
async def verify_consistency(self):
|
||||
await self._verify_base_table()
|
||||
await self._verify_counters()
|
||||
|
||||
|
||||
async def get_token_for_pk(cql, ks: str, tbl: str, pk: int) -> int:
|
||||
"""Get the token for a given primary key"""
|
||||
|
||||
@@ -1,412 +0,0 @@
|
||||
# Copyright (C) 2025-present ScyllaDB
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
from typing import Dict
|
||||
|
||||
import pytest
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.lwt.lwt_common import (
|
||||
BaseLWTTester,
|
||||
DEFAULT_WORKERS,
|
||||
DEFAULT_NUM_KEYS,
|
||||
wait_for_tablet_count
|
||||
)
|
||||
from test.cluster.util import new_test_keyspace
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import HTTPError
|
||||
from test.pylib.tablets import get_tablet_count
|
||||
from test.pylib.tablets import get_tablet_replicas
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
|
||||
TARGET_RESIZE_COUNT = 20
|
||||
NUM_MIGRATIONS = 20
|
||||
WARMUP_LWT_CNT = 100
|
||||
POST_LWT_CNT = 100
|
||||
|
||||
PHASE_WARMUP = "warmup"
|
||||
PHASE_RESIZE = "resize"
|
||||
PHASE_POST = "post"
|
||||
|
||||
MIN_TABLETS = 1
|
||||
MAX_TABLETS = 20
|
||||
RESIZE_TIMEOUT = 240
|
||||
MIGRATE_ONE_TIMEOUT_S = 60
|
||||
NO_REPLICA_RE = re.compile(r"has no replica on", re.IGNORECASE)
|
||||
DST_REPLICA_RE = re.compile(r"has replica on", re.IGNORECASE)
|
||||
|
||||
|
||||
def _err_code(e: Exception):
|
||||
return getattr(e, "code", None)
|
||||
|
||||
def _err_text(e: Exception):
|
||||
return getattr(e, "text", "") or str(e)
|
||||
|
||||
def _is_tablet_in_transition_http_error(e: Exception) -> bool:
|
||||
return isinstance(e, HTTPError) and _err_code(e) == 500 and "in transition" in _err_text(e).lower()
|
||||
|
||||
def _is_no_replica_on_src_error(e: Exception) -> bool:
|
||||
return isinstance(e, HTTPError) and _err_code(e) == 500 and NO_REPLICA_RE.search(_err_text(e)) is not None
|
||||
|
||||
def _is_dst_already_replica_error(e: Exception) -> bool:
|
||||
return isinstance(e, HTTPError) and _err_code(e) == 500 and DST_REPLICA_RE.search(_err_text(e)) is not None
|
||||
|
||||
|
||||
async def _move_tablet_with_retry(manager, src_server, ks, tbl,
|
||||
src_host_id, src_shard, dst_host_id, dst_shard, token,
|
||||
*, timeout_s=MIGRATE_ONE_TIMEOUT_S, base_sleep=0.1, max_sleep=2.0):
|
||||
deadline = time.time() + timeout_s
|
||||
sleep = base_sleep
|
||||
while True:
|
||||
try:
|
||||
await manager.api.move_tablet(
|
||||
src_server.ip_addr, ks, tbl,
|
||||
src_host_id, src_shard, dst_host_id, dst_shard, token
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
if _is_tablet_in_transition_http_error(e) and time.time() + sleep < deadline:
|
||||
logger.info("Token %s in transition, retry in %.2fs", token, sleep)
|
||||
await asyncio.sleep(sleep + random.uniform(0, sleep))
|
||||
sleep = min(sleep * 1.7, max_sleep)
|
||||
continue
|
||||
raise
|
||||
|
||||
|
||||
async def tablet_migration_ops(
|
||||
stop_event: asyncio.Event,
|
||||
manager: ManagerClient,
|
||||
servers,
|
||||
tester: BaseLWTTester,
|
||||
table: str,
|
||||
num_ops: int,
|
||||
pause_range=(0.5, 2.0),
|
||||
*,
|
||||
server_properties,
|
||||
) -> None:
|
||||
logger.info("Starting tablet migration ops for %s.%s: target=%d", tester.ks, table, num_ops)
|
||||
migration_count = 0
|
||||
intranode_ratio = 0.3
|
||||
|
||||
# server_id -> rack
|
||||
server_id_to_rack: Dict[str, str] = {
|
||||
s.server_id: prop["rack"] for s, prop in zip(servers, server_properties)
|
||||
}
|
||||
host_ids = await asyncio.gather(
|
||||
*(manager.get_host_id(s.server_id) for s in servers)
|
||||
)
|
||||
# server_id -> host_id и host_id -> server
|
||||
server_id_to_host_id: Dict[str, str] = {
|
||||
s.server_id: hid for s, hid in zip(servers, host_ids)
|
||||
}
|
||||
host_id_to_server = {
|
||||
hid: s for s, hid in zip(servers, host_ids)
|
||||
}
|
||||
|
||||
attempt = 0
|
||||
while not stop_event.is_set() and migration_count < num_ops:
|
||||
attempt += 1
|
||||
sample_pk = random.choice(tester.pks)
|
||||
token = tester.pk_to_token[sample_pk]
|
||||
|
||||
replicas = await get_tablet_replicas(
|
||||
manager, servers[0], tester.ks, table, token
|
||||
)
|
||||
src_host_id, src_shard = random.choice(replicas)
|
||||
src_server = host_id_to_server.get(src_host_id)
|
||||
assert src_server is not None, (
|
||||
f"Source host_id {src_host_id} for token {token} not found in host_id_to_server (attempt {attempt})"
|
||||
)
|
||||
|
||||
if random.random() < intranode_ratio:
|
||||
dst_host_id = src_host_id
|
||||
dst_server = src_server
|
||||
dst_shard = 0 if src_shard != 0 else 1
|
||||
else:
|
||||
replica_hids = {h for (h, _sh) in replicas}
|
||||
src_rack = server_id_to_rack[src_server.server_id]
|
||||
|
||||
same_rack_candidates = [
|
||||
s for s in servers if server_id_to_rack[s.server_id] == src_rack
|
||||
and server_id_to_host_id[s.server_id] not in replica_hids
|
||||
]
|
||||
|
||||
assert same_rack_candidates, (
|
||||
f"No same-rack non-replica candidate for token {token} (attempt {attempt})"
|
||||
)
|
||||
|
||||
dst_server = random.choice(same_rack_candidates)
|
||||
dst_host_id = server_id_to_host_id[dst_server.server_id]
|
||||
dst_shard = 0
|
||||
|
||||
try:
|
||||
await _move_tablet_with_retry(
|
||||
manager, src_server, tester.ks, table,
|
||||
src_host_id, src_shard, dst_host_id, dst_shard, token,
|
||||
timeout_s=60,
|
||||
)
|
||||
|
||||
migration_count += 1
|
||||
logger.info(
|
||||
"Completed migration #%d (token=%s -> %s:%d) for %s.%s",
|
||||
migration_count, token, dst_server.ip_addr, dst_shard, tester.ks, table,
|
||||
)
|
||||
await asyncio.sleep(random.uniform(*pause_range))
|
||||
continue
|
||||
except Exception as e:
|
||||
if _is_tablet_in_transition_http_error(e):
|
||||
logger.info("Token %s in transition, switching token (attempt %d)",
|
||||
token, attempt)
|
||||
continue
|
||||
if _is_no_replica_on_src_error(e) or _is_dst_already_replica_error(e):
|
||||
logger.info("Src replica vanished for token %s, re-pick (attempt %d)",
|
||||
token, attempt)
|
||||
continue
|
||||
raise
|
||||
|
||||
assert migration_count == num_ops, f"Only completed {migration_count}/{num_ops} migrations for {tester.ks}.{table}"
|
||||
logger.info("Completed tablet migration ops for %s.%s: %d/%d", tester.ks, table, migration_count, num_ops)
|
||||
|
||||
|
||||
def powers_of_two_in_range(lo: int, hi: int):
|
||||
if lo > hi or hi < 1:
|
||||
return []
|
||||
lo = max(1, lo)
|
||||
start_e = (lo - 1).bit_length()
|
||||
end_e = hi.bit_length()
|
||||
return [1 << e for e in range(start_e, end_e + 1) if (1 << e) <= hi]
|
||||
|
||||
|
||||
async def run_random_resizes(
|
||||
stop_event_: asyncio.Event,
|
||||
manager: ManagerClient,
|
||||
servers,
|
||||
tester: BaseLWTTester,
|
||||
ks: str,
|
||||
table: str,
|
||||
counter_table: str,
|
||||
target_steps: int = TARGET_RESIZE_COUNT,
|
||||
pause_range=(0.5, 2.0),
|
||||
):
|
||||
"""
|
||||
Perform randomized tablet count changes (splits/merges) on the main LWT table
|
||||
and its counter table. Runs until target resize count is reached or stop_event_
|
||||
is set. Returns a dict with simple stats.
|
||||
"""
|
||||
split_count = 0
|
||||
merge_count = 0
|
||||
current_resize_count = 0
|
||||
pow2_targets = powers_of_two_in_range(MIN_TABLETS, MAX_TABLETS)
|
||||
|
||||
while not stop_event_.is_set() and current_resize_count < target_steps:
|
||||
# Drive resize direction from the main table.
|
||||
current_main = await get_tablet_count(manager, servers[0], ks, table)
|
||||
|
||||
candidates = [t for t in pow2_targets if t != current_main]
|
||||
target_cnt = random.choice(candidates)
|
||||
|
||||
direction = "split" if target_cnt > current_main else "merge"
|
||||
logger.info(
|
||||
"[%s] starting: %s.%s=%d, %s.%s -> target %d",
|
||||
direction.upper(), ks, table, current_main, ks,
|
||||
counter_table, target_cnt
|
||||
)
|
||||
tables = [table, counter_table]
|
||||
# Apply ALTER TABLE to both tables.
|
||||
for tbl in tables:
|
||||
await tester.cql.run_async(
|
||||
f"ALTER TABLE {ks}.{tbl} "
|
||||
f"WITH tablets = {{'min_tablet_count': {target_cnt}}}"
|
||||
)
|
||||
|
||||
if direction == "split":
|
||||
predicate = lambda c, tgt=target_cnt: c >= tgt
|
||||
else:
|
||||
predicate = lambda c, tgt=target_cnt: c <= tgt
|
||||
|
||||
# Wait for both tables to converge.
|
||||
main_after, counter_after = await asyncio.gather(
|
||||
wait_for_tablet_count(
|
||||
manager,
|
||||
servers[0],
|
||||
tester.ks,
|
||||
table,
|
||||
predicate=predicate,
|
||||
target=target_cnt,
|
||||
timeout_s=RESIZE_TIMEOUT,
|
||||
),
|
||||
wait_for_tablet_count(
|
||||
manager,
|
||||
servers[0],
|
||||
tester.ks,
|
||||
counter_table,
|
||||
predicate=predicate,
|
||||
target=target_cnt,
|
||||
timeout_s=RESIZE_TIMEOUT,
|
||||
),
|
||||
)
|
||||
|
||||
# Sanity: both tables should end up with the same tablet count.
|
||||
assert main_after == counter_after, (
|
||||
f"Tablet counts diverged: {ks}.{table}={main_after}, "
|
||||
f"{ks}.{counter_table}={counter_after}"
|
||||
)
|
||||
|
||||
if direction == "split":
|
||||
logger.info(
|
||||
"[SPLIT] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
|
||||
ks, table, current_main, main_after, ks, counter_table,
|
||||
counter_after, target_cnt
|
||||
)
|
||||
assert main_after >= current_main, (
|
||||
f"Tablet count expected to increase during split "
|
||||
f"(was {current_main}, now {main_after})"
|
||||
)
|
||||
split_count += 1
|
||||
else:
|
||||
logger.info(
|
||||
"[MERGE] converged: %s.%s %d -> %d, %s.%s -> %d (target %d)",
|
||||
ks, table, current_main, main_after, ks, counter_table,
|
||||
counter_after, target_cnt
|
||||
)
|
||||
assert main_after <= current_main, (
|
||||
f"Tablet count expected to decrease during merge "
|
||||
f"(was {current_main}, now {main_after})"
|
||||
)
|
||||
merge_count += 1
|
||||
|
||||
current_resize_count += 1
|
||||
await asyncio.sleep(random.uniform(*pause_range))
|
||||
|
||||
return {
|
||||
"steps_done": current_resize_count,
|
||||
"seen_split": split_count,
|
||||
"seen_merge": merge_count,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode("debug", "debug mode is too slow for this test")
|
||||
async def test_multi_column_lwt_migrate_and_random_resizes(manager: ManagerClient):
|
||||
|
||||
cfg = {
|
||||
"enable_tablets": True,
|
||||
"tablet_load_stats_refresh_interval_in_seconds": 1,
|
||||
"target-tablet-size-in-bytes": 1024 * 16,
|
||||
}
|
||||
|
||||
properties = [
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
{"dc": "dc1", "rack": "r1"},
|
||||
{"dc": "dc1", "rack": "r2"},
|
||||
{"dc": "dc1", "rack": "r3"},
|
||||
]
|
||||
|
||||
cmdline = [
|
||||
'--logger-log-level', 'paxos=trace', '--smp=2',
|
||||
]
|
||||
|
||||
servers = await manager.servers_add(6, config=cfg, property_file=properties, cmdline=cmdline)
|
||||
|
||||
async with new_test_keyspace(
|
||||
manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} "
|
||||
"AND tablets = {'initial': 1}",
|
||||
) as ks:
|
||||
stop_event_ = asyncio.Event()
|
||||
table = "lwt_split_merge_table"
|
||||
cnt_table = "lwt_split_merge_counters"
|
||||
tester = BaseLWTTester(
|
||||
manager,
|
||||
ks,
|
||||
table,
|
||||
num_workers=DEFAULT_WORKERS,
|
||||
num_keys=DEFAULT_NUM_KEYS,
|
||||
use_counters=True,
|
||||
counters_random_delta=True,
|
||||
counters_max_delta=5,
|
||||
counter_tbl=cnt_table,
|
||||
)
|
||||
|
||||
await tester.create_schema()
|
||||
await tester.initialize_rows()
|
||||
await tester.start_workers(stop_event_)
|
||||
|
||||
try:
|
||||
# PHASE: warmup
|
||||
tester.set_phase(PHASE_WARMUP)
|
||||
logger.info("LWT warmup: waiting for %d applied CAS", WARMUP_LWT_CNT)
|
||||
await tester.wait_for_phase_ops(stop_event_, PHASE_WARMUP, WARMUP_LWT_CNT, timeout=180, poll=0.2)
|
||||
logger.info("LWT warmup complete: %d ops", tester.get_phase_ops(PHASE_WARMUP))
|
||||
|
||||
# PHASE: resize + migrate
|
||||
tester.set_phase(PHASE_RESIZE)
|
||||
logger.info("Starting RESIZE (random powers-of-two) + %d migrations per table", NUM_MIGRATIONS)
|
||||
|
||||
resize_task = asyncio.create_task(
|
||||
run_random_resizes(
|
||||
stop_event_=stop_event_,
|
||||
manager=manager,
|
||||
servers=servers,
|
||||
tester=tester,
|
||||
ks=ks,
|
||||
table=table,
|
||||
target_steps=TARGET_RESIZE_COUNT,
|
||||
counter_table=cnt_table,
|
||||
)
|
||||
)
|
||||
migrate_task = asyncio.create_task(
|
||||
tablet_migration_ops(
|
||||
stop_event_,
|
||||
manager, servers, tester,
|
||||
num_ops=NUM_MIGRATIONS,
|
||||
pause_range=(0.3, 1.0),
|
||||
server_properties=properties,
|
||||
table=table,
|
||||
)
|
||||
)
|
||||
migrate_cnt_task = asyncio.create_task(
|
||||
tablet_migration_ops(
|
||||
stop_event_,
|
||||
manager, servers, tester,
|
||||
num_ops=NUM_MIGRATIONS,
|
||||
pause_range=(0.3, 1.0),
|
||||
server_properties=properties,
|
||||
table=cnt_table
|
||||
)
|
||||
)
|
||||
|
||||
resize_stats = await resize_task
|
||||
await asyncio.gather(migrate_task, migrate_cnt_task)
|
||||
|
||||
logger.info(
|
||||
"Randomized resize stats: steps_done=%d, split=%d, merge=%d; LWT ops during resize=%d",
|
||||
resize_stats["steps_done"], resize_stats["seen_split"], resize_stats["seen_merge"],
|
||||
tester.get_phase_ops(PHASE_RESIZE),
|
||||
)
|
||||
assert resize_stats["steps_done"] >= 1, "Resize phase performed 0 steps"
|
||||
assert tester.get_phase_ops(PHASE_RESIZE) > 0, "Expected LWT ops during RESIZE phase"
|
||||
|
||||
# PHASE: post
|
||||
tester.set_phase(PHASE_POST)
|
||||
logger.info("LWT post resize: waiting for %d applied CAS", POST_LWT_CNT)
|
||||
await tester.wait_for_phase_ops(stop_event_, PHASE_POST, POST_LWT_CNT, timeout=180, poll=0.2)
|
||||
logger.info("LWT post resize complete: %d ops", tester.get_phase_ops(PHASE_POST))
|
||||
|
||||
total_ops = sum(tester.phase_ops.values())
|
||||
assert total_ops >= (WARMUP_LWT_CNT + POST_LWT_CNT), f"Too few total LWT ops: {total_ops}"
|
||||
|
||||
finally:
|
||||
await tester.stop_workers()
|
||||
|
||||
await tester.verify_consistency()
|
||||
logger.info("Combined LWT during random split/merge + migrations test completed successfully")
|
||||
@@ -121,7 +121,7 @@ async def test_change_two(manager, random_tables, build_mode):
|
||||
await manager.server_update_config(servers[1].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_update_config(servers[2].server_id, "error_injections_at_startup", ['sleep_before_start_gossiping'])
|
||||
await manager.server_start(servers[1].server_id)
|
||||
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
|
||||
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
|
||||
if build_mode != 'release':
|
||||
s0_logs = await manager.server_open_log(servers[0].server_id)
|
||||
await s0_logs.wait_for('crash-before-prev-ip-removed hit, killing the node')
|
||||
@@ -132,7 +132,7 @@ async def test_change_two(manager, random_tables, build_mode):
|
||||
await wait_proper_ips([servers[0], servers[1]])
|
||||
|
||||
await manager.server_start(servers[2].server_id)
|
||||
servers[2] = servers[2]._replace(ip_addr=s2_new_ip, rpc_address=s2_new_ip)
|
||||
servers[2] = ServerInfo(servers[2].server_id, s2_new_ip, s2_new_ip, servers[2].datacenter, servers[2].rack)
|
||||
await reconnect_driver(manager)
|
||||
await wait_proper_ips([servers[0], servers[1], servers[2]])
|
||||
|
||||
|
||||
@@ -51,9 +51,6 @@ async def test_kill_coordinator_during_op(manager: ManagerClient) -> None:
|
||||
coordinators_ids = await get_coordinator_host_ids(manager)
|
||||
assert len(coordinators_ids) == 1, "At least 1 coordinator id should be found"
|
||||
|
||||
# Configure manager to ignore crashes caused by crash_coordinator_before_stream injection
|
||||
manager.ignore_cores_log_patterns.append("crash_coordinator_before_stream: aborting")
|
||||
|
||||
# kill coordinator during decommission
|
||||
logger.debug("Kill coordinator during decommission")
|
||||
coordinator_host = await get_coordinator_host(manager)
|
||||
|
||||
@@ -48,7 +48,7 @@ async def test_no_removed_node_event_on_ip_change(manager: ManagerClient, caplog
|
||||
with test_cluster.connect() as test_cql:
|
||||
logger.info(f"starting the follower node {servers[1]}")
|
||||
await manager.server_start(servers[1].server_id)
|
||||
servers[1] = servers[1]._replace(ip_addr=s1_new_ip, rpc_address=s1_new_ip)
|
||||
servers[1] = ServerInfo(servers[1].server_id, s1_new_ip, s1_new_ip, servers[1].datacenter, servers[1].rack)
|
||||
|
||||
logger.info("waiting for cql and hosts")
|
||||
await wait_for_cql_and_get_hosts(test_cql, servers, time.time() + 30)
|
||||
|
||||
@@ -10,7 +10,7 @@ from cassandra.policies import FallthroughRetryPolicy
|
||||
from test.pylib.internal_types import HostID, ServerInfo, ServerNum
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot, HTTPError, read_barrier
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name, wait_for
|
||||
from test.pylib.util import wait_for_cql_and_get_hosts, unique_name
|
||||
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas, get_tablet_count, TabletReplicas
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import reconnect_driver, create_new_test_keyspace, new_test_keyspace
|
||||
@@ -1981,171 +1981,3 @@ async def test_timed_out_reader_after_cleanup(manager: ManagerClient):
|
||||
|
||||
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
||||
assert len(list(rows)) == 1
|
||||
|
||||
# This is a test and reproducer for https://github.com/scylladb/scylladb/issues/26041
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("repair_before_split", [False, True])
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_split_and_incremental_repair_synchronization(manager: ManagerClient, repair_before_split: bool):
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'load_balancer=debug',
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
'--logger-log-level', 'compaction=debug',
|
||||
]
|
||||
servers = await manager.servers_add(2, cmdline=cmdline, config=cfg, auto_rack_dc="dc1")
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
initial_tablets = 2
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 2}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
||||
|
||||
# insert data
|
||||
pks = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
||||
|
||||
# flush the table
|
||||
for server in servers:
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
s0_log = await manager.server_open_log(servers[0].server_id)
|
||||
s0_mark = await s0_log.mark()
|
||||
s1_log = await manager.server_open_log(servers[1].server_id)
|
||||
s1_mark = await s1_log.mark()
|
||||
expected_tablet_count = 4 # expected tablet count post split
|
||||
|
||||
async def run_split_prepare():
|
||||
await manager.api.enable_injection(servers[0].ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
||||
|
||||
# force split on the test table
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
||||
|
||||
await s0_log.wait_for('Finalizing resize decision for table', from_mark=s0_mark)
|
||||
|
||||
async def generate_repair_work():
|
||||
insert_stmt = cql.prepare(f"INSERT INTO {ks}.test (pk, c) VALUES (?, ?)")
|
||||
insert_stmt.consistency_level = ConsistencyLevel.ONE
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
|
||||
pks = range(256, 512)
|
||||
await asyncio.gather(*[cql.run_async(insert_stmt, (k, k)) for k in pks])
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
|
||||
|
||||
token = 'all'
|
||||
|
||||
await manager.api.enable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
if repair_before_split:
|
||||
await generate_repair_work()
|
||||
for server in servers:
|
||||
await manager.api.enable_injection(server.ip_addr, "incremental_repair_prepare_wait", one_shot=True)
|
||||
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental'))
|
||||
await s0_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s0_mark)
|
||||
await s1_log.wait_for('incremental_repair_prepare_wait: waiting', from_mark=s1_mark)
|
||||
|
||||
await run_split_prepare()
|
||||
|
||||
for server in servers:
|
||||
await manager.api.message_injection(server.ip_addr, "incremental_repair_prepare_wait")
|
||||
await repair_task
|
||||
else:
|
||||
await run_split_prepare()
|
||||
await generate_repair_work()
|
||||
await manager.api.tablet_repair(servers[0].ip_addr, ks, "test", token, incremental_mode='incremental')
|
||||
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "tablet_resize_finalization_postpone")
|
||||
|
||||
async def finished_splitting():
|
||||
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test')
|
||||
return tablet_count >= expected_tablet_count or None
|
||||
# Give enough time for split to happen in debug mode
|
||||
await wait_for(finished_splitting, time.time() + 120)
|
||||
|
||||
await manager.server_stop(servers[0].server_id)
|
||||
await manager.server_start(servers[0].server_id)
|
||||
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
||||
await manager.servers_see_each_other(servers)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_split_and_intranode_synchronization(manager: ManagerClient):
|
||||
logger.info('Bootstrapping cluster')
|
||||
cfg = { 'enable_tablets': True,
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1
|
||||
}
|
||||
cmdline = [
|
||||
'--logger-log-level', 'load_balancer=debug',
|
||||
'--logger-log-level', 'debug_error_injection=debug',
|
||||
'--logger-log-level', 'compaction=debug',
|
||||
'--smp', '2',
|
||||
]
|
||||
servers = await manager.servers_add(1, cmdline=cmdline, config=cfg)
|
||||
server = servers[0]
|
||||
|
||||
cql = manager.get_cql()
|
||||
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
|
||||
initial_tablets = 1
|
||||
|
||||
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int) WITH tablets = {{'min_tablet_count': {initial_tablets}}};")
|
||||
|
||||
# insert data
|
||||
pks = range(256)
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({k}, {k});") for k in pks])
|
||||
|
||||
# flush the table
|
||||
await manager.api.flush_keyspace(server.ip_addr, ks)
|
||||
|
||||
log = await manager.server_open_log(server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
tablet_token = 0 # Doesn't matter since there is one tablet
|
||||
replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
||||
|
||||
host_id = await manager.get_host_id(server.server_id)
|
||||
src_shard = replica[1]
|
||||
|
||||
# if tablet replica is at shard 0, move it to shard 1.
|
||||
if src_shard == 0:
|
||||
dst_shard = 1
|
||||
await manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token)
|
||||
|
||||
await manager.api.enable_tablet_balancing(server.ip_addr)
|
||||
|
||||
await manager.api.enable_injection(server.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
||||
await manager.api.enable_injection(server.ip_addr, "split_sstable_force_stop_exception", one_shot=False)
|
||||
|
||||
# force split on the test table
|
||||
expected_tablet_count = initial_tablets * 2
|
||||
await cql.run_async(f"ALTER TABLE {ks}.test WITH tablets = {{'min_tablet_count': {expected_tablet_count}}}")
|
||||
|
||||
# Check that shard 0 ACKed split.
|
||||
mark, _ = await log.wait_for('Setting split ready sequence number to', from_mark=mark)
|
||||
|
||||
# Move tablet replica back to shard 0, where split was already ACKed.
|
||||
src_shard = 1
|
||||
dst_shard = 0
|
||||
migration_task = asyncio.create_task(manager.api.move_tablet(server.ip_addr, ks, "test", replica[0], src_shard, replica[0], dst_shard, tablet_token))
|
||||
|
||||
mark, _ = await log.wait_for("Finished intra-node streaming of tablet", from_mark=mark)
|
||||
|
||||
await manager.api.stop_compaction(server.ip_addr, "SPLIT")
|
||||
|
||||
await migration_task
|
||||
|
||||
await manager.api.disable_injection(server.ip_addr, "tablet_resize_finalization_postpone")
|
||||
|
||||
async def finished_splitting():
|
||||
tablet_count = await get_tablet_count(manager, server, ks, 'test')
|
||||
return tablet_count >= expected_tablet_count or None
|
||||
# Give enough time for split to happen in debug mode
|
||||
await wait_for(finished_splitting, time.time() + 120)
|
||||
@@ -1,8 +1,5 @@
|
||||
add_library(test-lib STATIC)
|
||||
target_sources(test-lib
|
||||
PUBLIC
|
||||
boost_test_tree_lister.cc
|
||||
boost_tree_lister_injector.cc
|
||||
PRIVATE
|
||||
cql_assertions.cc
|
||||
dummy_sharder.cc
|
||||
|
||||
@@ -1,422 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "test/lib/boost_test_tree_lister.hh"
|
||||
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include <memory>
|
||||
#include <ranges>
|
||||
|
||||
namespace {
|
||||
|
||||
using label_info = internal::label_info;
|
||||
|
||||
using test_case_info = internal::test_case_info;
|
||||
using test_suite_info = internal::test_suite_info;
|
||||
using test_file_info = internal::test_file_info;
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
/// --------------------
|
||||
///
|
||||
/// Implementation notes
|
||||
///
|
||||
/// --------------------
|
||||
///
|
||||
/// The structure of the Boost.Test's test tree consists solely
|
||||
/// of nodes representing test suites and test cases. It ignores
|
||||
/// information like, for instance, the name of the file those
|
||||
/// entities reside [1].
|
||||
///
|
||||
/// What's more, a test suite can span multiple files as long
|
||||
/// as it has the same name [2].
|
||||
///
|
||||
/// We'd like to re-visualize the tree in a different manner:
|
||||
/// have a forest, where each tree represents the internal structure
|
||||
/// of a specific file. The non-leaf nodes represent test suites,
|
||||
/// and the leaves -- test cases.
|
||||
///
|
||||
/// This type achieves that very goal (albeit in a bit ugly manner).
|
||||
///
|
||||
/// ---
|
||||
///
|
||||
/// Note that the implementation suffers from the same problems
|
||||
/// Boost.Test itself does. For instance, when parametrizing tests
|
||||
/// with `boost::unit_test::data`, the test will appear as a test suite,
|
||||
/// while cases for each of the data instances -- as test cases.
|
||||
/// There's no way to overcome that, so we're stuck with it.
|
||||
///
|
||||
/// -----------
|
||||
///
|
||||
/// Assumptions
|
||||
///
|
||||
/// -----------
|
||||
///
|
||||
/// We rely on the following assumptions:
|
||||
///
|
||||
/// 1. The tree traversal is performed pre-order. That's the case for
|
||||
/// Boost.Test 1.89.0.
|
||||
/// 2. If a test case TC belong to a test suite TS (directly or indirectly),
|
||||
/// the following execution order holds:
|
||||
/// i. `test_suite_start(TC)`,
|
||||
/// ii. `visit(TC)`,
|
||||
/// iii. `test_suite_finish(TC)`.
|
||||
/// 3. If test suite TS1 is nested within test suite TS2, the following
|
||||
/// execution order holds:
|
||||
/// i. `test_suite_start(TS1)`,
|
||||
/// ii. `test_suite_start(TS2)`,
|
||||
/// iii. `test_suite_finish(TS2)`,
|
||||
/// iv. `test_suite_finish(TS1)`.
|
||||
///
|
||||
/// ----------
|
||||
///
|
||||
/// References
|
||||
///
|
||||
/// ----------
|
||||
///
|
||||
/// [1] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree.html
|
||||
/// [2] https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html
|
||||
///
|
||||
/// ----------------------------
|
||||
///
|
||||
/// Example of high-level output
|
||||
///
|
||||
/// ----------------------------
|
||||
///
|
||||
/// Let's consider the following organization of tests.
|
||||
///
|
||||
/// TestFile1.cc:
|
||||
/// - Suite A:
|
||||
/// - Suite A1:
|
||||
/// - Test A1.1 (labels: L1)
|
||||
/// - Test A1.2
|
||||
/// - Suite A2:
|
||||
/// - Test A2.1
|
||||
/// - Test A.1
|
||||
/// - Suite B:
|
||||
/// - Test B1
|
||||
/// - Test B2 (labels: L2, L3)
|
||||
/// - Test 1
|
||||
///
|
||||
/// TestFile2.cc:
|
||||
/// - Suite A:
|
||||
/// - Suite A3
|
||||
/// - Test A3.1
|
||||
/// - Test A.2
|
||||
/// - Suite C:
|
||||
/// - Test C.1
|
||||
/// - Test 2
|
||||
///
|
||||
/// This structure will be translated into the following JSON (we're
|
||||
/// omitting some details to make it cleaner and easier to read):
|
||||
///
|
||||
/// [
|
||||
/// {
|
||||
/// "file": "TestFile1.cc",
|
||||
/// "content": {
|
||||
/// "suites": [
|
||||
/// {
|
||||
/// "name": "A",
|
||||
/// "suites": [
|
||||
/// {
|
||||
/// "name": "A1",
|
||||
/// "suites": [],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test A1.1",
|
||||
/// "labels": "L1"
|
||||
/// },
|
||||
/// {
|
||||
/// "name": "Test A1.2",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// ],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test1",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// },
|
||||
/// {
|
||||
/// "name": "B",
|
||||
/// "suites": [],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test B1",
|
||||
/// "labels": ""
|
||||
/// },
|
||||
/// {
|
||||
/// "name": "Test B2",
|
||||
/// "labels": "L2,L3"
|
||||
/// },
|
||||
/// ]
|
||||
/// }
|
||||
/// ],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test 1",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// },
|
||||
/// {
|
||||
/// "file": "TestFile2.cc",
|
||||
/// "content": {
|
||||
/// "suites": [
|
||||
/// {
|
||||
/// "name": "A",
|
||||
/// "suites": [
|
||||
/// {
|
||||
/// "name": "A3",
|
||||
/// "suites": [],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test A3.1",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// ],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test A.2",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// },
|
||||
/// {
|
||||
/// "name": "C",
|
||||
/// "suites": [],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test C.1",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// ],
|
||||
/// "tests": [
|
||||
/// {
|
||||
/// "name": "Test 2",
|
||||
/// "labels": ""
|
||||
/// }
|
||||
/// ]
|
||||
/// }
|
||||
/// }
|
||||
/// ]
|
||||
///
|
||||
/// Note that although Boost.Test treats Suite A in TestFile1.cc
|
||||
/// and Suite A in TestFile2.cc as the SAME suite, we consider it
|
||||
/// separately for each of the files it resides in.
|
||||
struct boost_test_tree_lister::impl {
|
||||
public:
|
||||
/// The final result we're building while traversing the test tree.
|
||||
test_file_forest file_forest;
|
||||
/// The path from the root to the current suite.
|
||||
std::vector<std::string> active_suites;
|
||||
|
||||
public:
|
||||
void process_test_case(const boost::unit_test::test_case& tc) {
|
||||
const std::string_view filename = {tc.p_file_name.begin(), tc.p_file_name.end()};
|
||||
test_file_info& test_file = get_file_info(filename);
|
||||
|
||||
std::string test_name = tc.p_name;
|
||||
std::vector<label_info> labels = tc.p_labels.get();
|
||||
|
||||
test_case_info test_info {.name = std::move(test_name), .labels = std::move(labels)};
|
||||
|
||||
if (active_suites.empty()) {
|
||||
test_file.free_tests.push_back(std::move(test_info));
|
||||
} else {
|
||||
test_suite_info& suite_info = get_active_suite(filename);
|
||||
suite_info.tests.push_back(std::move(test_info));
|
||||
}
|
||||
}
|
||||
|
||||
bool test_suite_start(const boost::unit_test::test_suite& ts) {
|
||||
// The suite is the master test suite, so let's ignore it
|
||||
// because it doesn't represent any actual test suite.
|
||||
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
|
||||
assert(active_suites.empty());
|
||||
return true;
|
||||
}
|
||||
|
||||
std::string suite_name = ts.p_name.value;
|
||||
add_active_suite(std::move(suite_name));
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void test_suite_finish(const boost::unit_test::test_suite& ts) {
|
||||
// The suite is the master test suite, so let's ignore it
|
||||
// because it doesn't represent any actual test suite.
|
||||
if (ts.p_parent_id == boost::unit_test::INV_TEST_UNIT_ID) {
|
||||
assert(active_suites.empty());
|
||||
return;
|
||||
}
|
||||
|
||||
drop_active_suite();
|
||||
}
|
||||
|
||||
private:
|
||||
test_file_info& get_file_info(std::string_view filename) {
|
||||
auto& test_files = file_forest.test_files;
|
||||
|
||||
auto it = test_files.find(filename);
|
||||
if (it == test_files.end()) {
|
||||
std::tie(it, std::ignore) = test_files.emplace(filename, std::vector<test_suite_info>{});
|
||||
}
|
||||
|
||||
return it->second;
|
||||
}
|
||||
|
||||
void add_active_suite(std::string suite_name) {
|
||||
active_suites.push_back(std::move(suite_name));
|
||||
}
|
||||
|
||||
void drop_active_suite() {
|
||||
assert(!active_suites.empty());
|
||||
active_suites.pop_back();
|
||||
}
|
||||
|
||||
test_suite_info& get_active_suite(std::string_view filename) {
|
||||
assert(!active_suites.empty());
|
||||
|
||||
test_file_info& file_info = get_file_info(filename);
|
||||
test_suite_info* last = &get_root_suite(file_info, active_suites[0]);
|
||||
|
||||
for (const auto& suite_name : active_suites | std::views::drop(1)) {
|
||||
last = &get_subsuite(*last, suite_name);
|
||||
}
|
||||
|
||||
return *last;
|
||||
}
|
||||
|
||||
test_suite_info& get_root_suite(test_file_info& file_info, std::string_view suite_name) {
|
||||
auto suite_it = std::ranges::find(file_info.suites, suite_name, &test_suite_info::name);
|
||||
if (suite_it != file_info.suites.end()) {
|
||||
return *suite_it;
|
||||
}
|
||||
|
||||
test_suite_info suite_info {.name = std::string(suite_name)};
|
||||
file_info.suites.push_back(std::move(suite_info));
|
||||
|
||||
return *file_info.suites.rbegin();
|
||||
}
|
||||
|
||||
test_suite_info& get_subsuite(test_suite_info& parent, std::string_view suite_name) {
|
||||
auto suite_it = std::ranges::find(parent.subsuites, suite_name, [] (auto&& suite_ptr) -> std::string_view {
|
||||
return suite_ptr->name;
|
||||
});
|
||||
|
||||
if (suite_it != parent.subsuites.end()) {
|
||||
return **suite_it;
|
||||
}
|
||||
|
||||
auto suite = std::make_unique<test_suite_info>(std::string(suite_name));
|
||||
parent.subsuites.push_back(std::move(suite));
|
||||
|
||||
return **parent.subsuites.rbegin();
|
||||
}
|
||||
};
|
||||
|
||||
boost_test_tree_lister::boost_test_tree_lister() : _impl(std::make_unique<impl>()) {}
|
||||
boost_test_tree_lister::~boost_test_tree_lister() noexcept = default;
|
||||
|
||||
const test_file_forest& boost_test_tree_lister::get_result() const {
|
||||
return _impl->file_forest;
|
||||
}
|
||||
|
||||
void boost_test_tree_lister::visit(const boost::unit_test::test_case& tc) {
|
||||
return _impl->process_test_case(tc);
|
||||
}
|
||||
|
||||
bool boost_test_tree_lister::test_suite_start(const boost::unit_test::test_suite& ts) {
|
||||
return _impl->test_suite_start(ts);
|
||||
}
|
||||
|
||||
void boost_test_tree_lister::test_suite_finish(const boost::unit_test::test_suite& ts) {
|
||||
return _impl->test_suite_finish(ts);
|
||||
}
|
||||
|
||||
// Replace every occurrenace of a double quotation mark (`"`) with a string `\"`.
|
||||
static std::string escape_quotation_marks(std::string_view str) {
|
||||
const std::size_t double_quotation_count = std::ranges::count(str, '"');
|
||||
std::string result(str.size() + double_quotation_count, '\\');
|
||||
|
||||
std::size_t offset = 0;
|
||||
for (std::size_t i = 0; i < str.size(); ++i) {
|
||||
if (str[i] == '"') {
|
||||
result[i + offset] = '\\';
|
||||
++offset;
|
||||
}
|
||||
result[i + offset] = str[i];
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
auto fmt::formatter<internal::test_case_info>::format(
|
||||
const internal::test_case_info& test_info,
|
||||
fmt::format_context& ctx) const -> decltype(ctx.out())
|
||||
{
|
||||
// Sanity check. The names of tests are expected to comprise only of alphanumeric characters.
|
||||
assert(std::ranges::count(test_info.name, '"') == 0);
|
||||
auto label_range = test_info.labels | std::views::transform(escape_quotation_marks);
|
||||
|
||||
return fmt::format_to(ctx.out(), R"({{"name":"{}","labels":"{}"}})",
|
||||
test_info.name, fmt::join(label_range, ","));
|
||||
}
|
||||
|
||||
auto fmt::formatter<internal::test_suite_info>::format(
|
||||
const internal::test_suite_info& suite_info,
|
||||
fmt::format_context& ctx) const -> decltype(ctx.out())
|
||||
{
|
||||
auto actual_suite_range = suite_info.subsuites | std::views::transform([] (auto&& ptr) -> const test_suite_info& {
|
||||
return *ptr;
|
||||
});
|
||||
auto suite_range = fmt::join(actual_suite_range, ",");
|
||||
auto test_range = fmt::join(suite_info.tests, ",");
|
||||
return fmt::format_to(ctx.out(), R"({{"name":"{}","suites":[{}],"tests":[{}]}})",
|
||||
suite_info.name, std::move(suite_range), std::move(test_range));
|
||||
}
|
||||
|
||||
auto fmt::formatter<internal::test_file_info>::format(
|
||||
const internal::test_file_info& file_info,
|
||||
fmt::format_context& ctx) const -> decltype(ctx.out())
|
||||
{
|
||||
auto suite_range = fmt::join(file_info.suites, ",");
|
||||
auto test_range = fmt::join(file_info.free_tests, ",");
|
||||
return fmt::format_to(ctx.out(), R"({{"suites":[{}],"tests":[{}]}})",
|
||||
std::move(suite_range), std::move(test_range));
|
||||
}
|
||||
|
||||
auto fmt::formatter<internal::test_file_forest>::format(
|
||||
const internal::test_file_forest& forest_info,
|
||||
fmt::format_context& ctx) const -> decltype(ctx.out())
|
||||
{
|
||||
std::size_t files_left = forest_info.test_files.size();
|
||||
|
||||
fmt::format_to(ctx.out(), "[");
|
||||
for (const auto& [file, content] : forest_info.test_files) {
|
||||
fmt::format_to(ctx.out(), R"({{"file":"{}","content":{}}})",
|
||||
file, content);
|
||||
if (files_left > 1) {
|
||||
fmt::format_to(ctx.out(), ",");
|
||||
}
|
||||
--files_left;
|
||||
|
||||
}
|
||||
return fmt::format_to(ctx.out(), "]");
|
||||
}
|
||||
@@ -1,117 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/test/tree/visitor.hpp>
|
||||
|
||||
#include <fmt/base.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace internal {
|
||||
|
||||
using label_info = std::string;
|
||||
|
||||
/// Type representing a single Boost test case.
|
||||
struct test_case_info {
|
||||
/// The name of the test case.
|
||||
std::string name;
|
||||
/// The labels the test was marked with.
|
||||
std::vector<label_info> labels;
|
||||
};
|
||||
|
||||
/// Type representing a single Boost test suite within a single file.
|
||||
///
|
||||
/// Note that a single suite can span multiple files (as of Boost.Test 1.89.0); see:
|
||||
/// https://www.boost.org/doc/libs/1_89_0/libs/test/doc/html/boost_test/tests_organization/test_tree/test_suite.html.
|
||||
///
|
||||
/// We turn away from that convention and list suites from different files separately.
|
||||
/// However, that doesn't change the fact that it's still the same suite from the
|
||||
/// perspective of Boost.Test. In particular, if a suite is marked with a label,
|
||||
/// it's applied to it globally.
|
||||
struct test_suite_info {
|
||||
std::string name;
|
||||
std::vector<std::unique_ptr<test_suite_info>> subsuites;
|
||||
/// The tests belonging directly to this suite.
|
||||
std::vector<test_case_info> tests;
|
||||
};
|
||||
|
||||
struct test_file_info {
|
||||
std::vector<test_suite_info> suites;
|
||||
std::vector<test_case_info> free_tests;
|
||||
};
|
||||
|
||||
struct test_file_forest {
|
||||
std::map<std::string, test_file_info, std::less<>> test_files;
|
||||
};
|
||||
|
||||
} // namespace internal
|
||||
|
||||
using test_file_forest = internal::test_file_forest;
|
||||
|
||||
/// Implementation of the `boost::unit_test::test_tree_visitor` that
|
||||
/// produces a similar result to running a Boost.Test executable with
|
||||
/// `--list_content=HRF` or `--list_content=DOT`. This type results
|
||||
/// in the JSON format of the output.
|
||||
///
|
||||
/// The crucial difference between this implementation and the built-in
|
||||
/// HRF and DOT ones is that the result obtained by a call to `get_result()`
|
||||
/// (after the traversal has finished) is going to have a different structure.
|
||||
///
|
||||
/// The type `boost_test_tree_lister` will treat the same suite from different
|
||||
/// files as separate ones, even if they share the name. Boost.Test would treat
|
||||
/// them as the same one and group the results by suites. In other words,
|
||||
/// this type groups results by (in order):
|
||||
///
|
||||
/// 1. File
|
||||
/// 2. Suite(s)
|
||||
/// 3. Test cases
|
||||
class boost_test_tree_lister : public boost::unit_test::test_tree_visitor {
|
||||
private:
|
||||
struct impl;
|
||||
|
||||
private:
|
||||
std::unique_ptr<impl> _impl;
|
||||
|
||||
public:
|
||||
boost_test_tree_lister();
|
||||
~boost_test_tree_lister() noexcept;
|
||||
|
||||
public:
|
||||
const test_file_forest& get_result() const;
|
||||
|
||||
private:
|
||||
virtual void visit(const boost::unit_test::test_case&) override;
|
||||
virtual bool test_suite_start(const boost::unit_test::test_suite&) override;
|
||||
virtual void test_suite_finish(const boost::unit_test::test_suite&) override;
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<internal::test_case_info> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const internal::test_case_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<internal::test_suite_info> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const internal::test_suite_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<internal::test_file_info> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const internal::test_file_info&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
|
||||
template <>
|
||||
struct fmt::formatter<internal::test_file_forest> {
|
||||
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }
|
||||
auto format(const internal::test_file_forest&, fmt::format_context& ctx) const -> decltype(ctx.out());
|
||||
};
|
||||
@@ -1,115 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2025-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
#include "test/lib/boost_test_tree_lister.hh"
|
||||
|
||||
#include <boost/test/framework.hpp>
|
||||
#include <boost/test/tree/traverse.hpp>
|
||||
#include <boost/test/unit_test_suite.hpp>
|
||||
|
||||
#include <fmt/core.h>
|
||||
|
||||
namespace {
|
||||
|
||||
/// Traverse the test tree and collect information about
|
||||
/// its structure and the tests.
|
||||
///
|
||||
/// The output is going to be in the JSON format.
|
||||
/// For more details, see the implementation of
|
||||
/// `boost_test_tree_lister`.
|
||||
void print_boost_tests() {
|
||||
namespace but = boost::unit_test;
|
||||
|
||||
but::framework::finalize_setup_phase();
|
||||
|
||||
boost_test_tree_lister traverser;
|
||||
but::traverse_test_tree(but::framework::master_test_suite().p_id, traverser, true);
|
||||
|
||||
fmt::print("{}", traverser.get_result());
|
||||
}
|
||||
|
||||
/// --------
|
||||
/// Examples
|
||||
/// --------
|
||||
///
|
||||
/// # This will NOT list the tests because Boost.Test
|
||||
/// # will interpret it as an argument to the framework.
|
||||
/// $ ./path/to/my/test/exec --list_json_content
|
||||
///
|
||||
/// # This will NOT list the tests because Boost.Test requires
|
||||
/// # that all non-Boost.Test arguments be provided AFTER
|
||||
/// # a `--` sequence (cf. example below).
|
||||
/// $ ./path/to/my/test/exec list_json_content
|
||||
///
|
||||
/// # This will NOT list the tests because Boost.Test because
|
||||
/// # the option simply doesn't match the exepected one.
|
||||
/// $ ./path/to/my/test/exec list_json_content
|
||||
///
|
||||
/// # This DOES work and DOES what we expect, i.e. it lists the tests.
|
||||
/// $ ./path/to/my/test/exec -- --list_json_content
|
||||
bool list_tests(int argc, char** argv) {
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
std::string_view option = argv[i];
|
||||
if (option == "--list_json_content") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
struct boost_tree_lister_injector {
|
||||
boost_tree_lister_injector() {
|
||||
const auto& master_suite = boost::unit_test::framework::master_test_suite();
|
||||
/// The arguments here don't include Boost.Test-specific arguments.
|
||||
/// Those present correspond to the path to the binary and options
|
||||
/// specified for the "end-code".
|
||||
///
|
||||
/// --------
|
||||
/// Examples
|
||||
/// --------
|
||||
/// $ ./path/to/my/test/exec my_custom_arg
|
||||
/// Arguments: [<path>, "my_custom_arg"]
|
||||
///
|
||||
/// $ ./path/to/my/test/exec -- my_custom_arg
|
||||
/// Arguments: [<path>, "my_custom_arg"]
|
||||
///
|
||||
/// $ ./path/to/my/test/exec --auto_start_dbg=0 -- my_custom_arg
|
||||
/// Arguments: [<path>, "my_custom_arg"]
|
||||
///
|
||||
/// $ ./path/to/my/test/exec --auto_start_dbg=0 my_custom_arg
|
||||
/// Arguments: [<path>, "my_custom_arg"]
|
||||
///
|
||||
/// ------------------------------------------
|
||||
/// Interaction with some Boost.Test arguments
|
||||
/// ------------------------------------------
|
||||
///
|
||||
/// Note, however, that some Boost.Test options may prevent us
|
||||
/// from accessing this code. For instance, if the user runs
|
||||
///
|
||||
/// $ ./path/to/my/test/exec --list_content -- my_custom_arg
|
||||
///
|
||||
/// then Boost.Test will immediately move to its own code and not
|
||||
/// execute this one (because it's only called by a global fixture).
|
||||
auto&& [argc, argv] = std::make_pair(master_suite.argc, master_suite.argv);
|
||||
|
||||
if (list_tests(argc, argv)) {
|
||||
print_boost_tests();
|
||||
|
||||
// At this point, it's impossible to prevent Boost.Test
|
||||
// from executing the tests it collected. This is all
|
||||
// we can do (at least without writing a lot more code.
|
||||
// I don't know if it would still be possible to avoid it).
|
||||
std::exit(0);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
BOOST_GLOBAL_FIXTURE(boost_tree_lister_injector);
|
||||
@@ -91,7 +91,7 @@ public:
|
||||
sstables::sstables_manager& get_sstables_manager() noexcept override {
|
||||
return _sstables_manager;
|
||||
}
|
||||
sstables::shared_sstable make_sstable(sstables::sstable_state) const override {
|
||||
sstables::shared_sstable make_sstable() const override {
|
||||
return table().make_sstable();
|
||||
}
|
||||
sstables::sstable_writer_config configure_writer(sstring origin) const override {
|
||||
|
||||
@@ -385,7 +385,7 @@ def test_repair_options_hosts_and_dcs_tablets(nodetool, datacenter, hosts):
|
||||
[("--tablet-tokens", "1")],
|
||||
[("--tablet-tokens", "-1,2")],
|
||||
[("--tablet-tokens", "-1"), ("--tablet-tokens", "2")]])
|
||||
def test_repair_options_tokens_tablets(nodetool, tokens):
|
||||
def test_repair_options_hosts_tablets(nodetool, tokens):
|
||||
_do_test_repair_options_tablets(nodetool, tokens=tokens)
|
||||
|
||||
def test_repair_all_with_vnode_keyspace(nodetool):
|
||||
|
||||
@@ -623,7 +623,7 @@ Repair session 1
|
||||
Repair session 1 finished
|
||||
"""
|
||||
|
||||
def test_repair_keyspace_failure(nodetool):
|
||||
def test_repair_keyspace(nodetool):
|
||||
check_nodetool_fails_with(
|
||||
nodetool,
|
||||
("repair", "ks"),
|
||||
|
||||
@@ -29,9 +29,9 @@ class KeyProvider(Enum):
|
||||
|
||||
class KeyProviderFactory:
|
||||
"""Base class for provider factories"""
|
||||
def __init__(self, key_provider : KeyProvider, tmpdir):
|
||||
def __init__(self, key_provider : KeyProvider):
|
||||
self.key_provider = key_provider
|
||||
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
|
||||
self.system_keyfile = None
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
@@ -50,7 +50,7 @@ class KeyProviderFactory:
|
||||
|
||||
def configuration_parameters(self) -> dict[str, str]:
|
||||
"""scylla.conf entries for provider"""
|
||||
return {"system_key_directory": self.system_key_location}
|
||||
return {}
|
||||
|
||||
def additional_cf_options(self) -> dict[str, str]:
|
||||
# pylint: disable=unused-argument
|
||||
@@ -62,7 +62,7 @@ class KeyProviderFactory:
|
||||
class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
|
||||
"""LocalFileSystemKeyProviderFactory proxy"""
|
||||
def __init__(self, tmpdir):
|
||||
super(LocalFileSystemKeyProviderFactory, self).__init__(KeyProvider.local, tmpdir)
|
||||
super(LocalFileSystemKeyProviderFactory, self).__init__( KeyProvider.local)
|
||||
self.secret_file = os.path.join(tmpdir, "test/node1/conf/data_encryption_keys")
|
||||
|
||||
def additional_cf_options(self) -> dict[str, str]:
|
||||
@@ -72,7 +72,8 @@ class LocalFileSystemKeyProviderFactory(KeyProviderFactory):
|
||||
class ReplicatedKeyProviderFactory(KeyProviderFactory):
|
||||
"""ReplicatedKeyProviderFactory proxy"""
|
||||
def __init__(self, tmpdir):
|
||||
super(ReplicatedKeyProviderFactory, self).__init__(KeyProvider.replicated, tmpdir)
|
||||
super(ReplicatedKeyProviderFactory, self).__init__( KeyProvider.replicated)
|
||||
self.system_key_location = os.path.join(tmpdir, "resources/system_keys")
|
||||
self.system_key_file_name = "system_key"
|
||||
|
||||
async def __aenter__(self):
|
||||
@@ -87,13 +88,17 @@ class ReplicatedKeyProviderFactory(KeyProviderFactory):
|
||||
raise RuntimeError(f'Could not generate system key: {stderr.decode()}')
|
||||
return self
|
||||
|
||||
def configuration_parameters(self) -> dict[str, str]:
|
||||
"""scylla.conf entries for provider"""
|
||||
return super().configuration_parameters() | {"system_key_directory": self.system_key_location}
|
||||
|
||||
def additional_cf_options(self):
|
||||
return super().additional_cf_options() | {"system_key": self.system_key_file_name}
|
||||
|
||||
class KmipKeyProviderFactory(KeyProviderFactory):
|
||||
"""KmipKeyProviderFactory proxy"""
|
||||
def __init__(self, tmpdir):
|
||||
super(KmipKeyProviderFactory, self).__init__(KeyProvider.kmip, tmpdir)
|
||||
super(KmipKeyProviderFactory, self).__init__( KeyProvider.kmip)
|
||||
self.tmpdir = tmpdir
|
||||
self.kmip_server_wrapper = None
|
||||
self.kmip_host = "kmip_test"
|
||||
@@ -173,7 +178,7 @@ class KmipKeyProviderFactory(KeyProviderFactory):
|
||||
class KMSKeyProviderFactory(KeyProviderFactory):
|
||||
"""KMSKeyProviderFactory proxy"""
|
||||
def __init__(self, tmpdir):
|
||||
super(KMSKeyProviderFactory, self).__init__(KeyProvider.kms, tmpdir)
|
||||
super(KMSKeyProviderFactory, self).__init__( KeyProvider.kms)
|
||||
self.tmpdir = tmpdir
|
||||
self.master_key = "alias/Scylla-test"
|
||||
self.kms_host = "kms_test"
|
||||
@@ -255,7 +260,7 @@ class KMSKeyProviderFactory(KeyProviderFactory):
|
||||
class AzureKeyProviderFactory(KeyProviderFactory):
|
||||
"""AzureKeyProviderFactory proxy"""
|
||||
def __init__(self, tmpdir):
|
||||
super(AzureKeyProviderFactory, self).__init__(KeyProvider.azure, tmpdir)
|
||||
super(AzureKeyProviderFactory, self).__init__( KeyProvider.azure)
|
||||
self.tmpdir = tmpdir
|
||||
self.azure_host = "azure_test"
|
||||
self.azure_server = None
|
||||
|
||||
@@ -22,13 +22,12 @@ class ServerInfo(NamedTuple):
|
||||
rpc_address: IPAddress
|
||||
datacenter: str
|
||||
rack: str
|
||||
pid: int
|
||||
|
||||
def __str__(self):
|
||||
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack}, {self.pid})"
|
||||
|
||||
return f"Server({self.server_id}, {self.ip_addr}, {self.rpc_address}, {self.datacenter}, {self.rack})"
|
||||
|
||||
def as_dict(self) -> dict[str, object]:
|
||||
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack, "pid": self.pid}
|
||||
return {"server_id": self.server_id, "ip_addr": self.ip_addr, "rpc_address": self.rpc_address, "datacenter": self.datacenter, "rack": self.rack}
|
||||
|
||||
def property_file(self) -> dict[str, str]:
|
||||
return {"dc": self.datacenter, "rack": self.rack}
|
||||
|
||||
@@ -167,48 +167,3 @@ class ScyllaLogFile:
|
||||
line = await self._run_in_executor(log_file.readline, loop=loop)
|
||||
|
||||
return matches
|
||||
|
||||
async def find_backtraces(self, from_mark: int | None = None) -> list[str]:
|
||||
"""
|
||||
Find and extract all backtraces from the log file.
|
||||
|
||||
Each backtrace starts with a line "Backtrace:" followed by lines that start with exactly 2 spaces.
|
||||
If `from_mark` argument is given, the log is searched from that position, otherwise from the beginning.
|
||||
Return a list of strings, where each string is a complete backtrace (all lines joined together).
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
backtraces = []
|
||||
|
||||
with self.file.open(encoding="utf-8") as log_file:
|
||||
if from_mark:
|
||||
await self._run_in_executor(log_file.seek, from_mark, loop=loop)
|
||||
|
||||
line = await self._run_in_executor(log_file.readline, loop=loop)
|
||||
while line:
|
||||
if line.strip() == "Backtrace:":
|
||||
# Found a backtrace, collect all lines that start with exactly 2 spaces
|
||||
backtrace_lines = [line]
|
||||
while True:
|
||||
next_line = await self._run_in_executor(log_file.readline, loop=loop)
|
||||
if not next_line:
|
||||
# End of file
|
||||
break
|
||||
if next_line.startswith(" ") and not next_line.startswith(" "):
|
||||
# Line starts with exactly 2 spaces (backtrace entry)
|
||||
backtrace_lines.append(next_line)
|
||||
else:
|
||||
# End of backtrace
|
||||
line = next_line
|
||||
break
|
||||
|
||||
if backtrace_lines:
|
||||
# Join all backtrace lines into a single string
|
||||
backtraces.append(''.join(backtrace_lines))
|
||||
|
||||
# Continue from current line (already read in the inner loop)
|
||||
continue
|
||||
|
||||
line = await self._run_in_executor(log_file.readline, loop=loop)
|
||||
|
||||
return backtraces
|
||||
|
||||
@@ -8,9 +8,7 @@
|
||||
Provides helper methods to test cases.
|
||||
Manages driver refresh when cluster is cycled.
|
||||
"""
|
||||
from collections import defaultdict
|
||||
import pathlib
|
||||
import re
|
||||
import shutil
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pathlib import Path
|
||||
@@ -66,8 +64,6 @@ class ManagerClient:
|
||||
self.metrics = ScyllaMetricsClient()
|
||||
self.thread_pool = ThreadPoolExecutor()
|
||||
self.test_finished_event = asyncio.Event()
|
||||
self.ignore_log_patterns = [] # patterns to ignore in server logs when checking for errors
|
||||
self.ignore_cores_log_patterns = [] # patterns to ignore in server logs when checking for core files
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
@@ -183,82 +179,6 @@ class ManagerClient:
|
||||
logger.info("Cluster after test %s: %s", test_case_name, cluster_status)
|
||||
|
||||
return cluster_status
|
||||
|
||||
async def check_all_errors(self, check_all_errors=False) -> dict[ServerInfo, dict[str, Union[list[str], list[str], Path, list[str]]]]:
|
||||
|
||||
errors = defaultdict(dict)
|
||||
# find errors in logs
|
||||
for server in await self.all_servers():
|
||||
log_file = await self.server_open_log(server_id=server.server_id)
|
||||
# check if we should ignore cores on this server
|
||||
ignore_cores = []
|
||||
if self.ignore_cores_log_patterns:
|
||||
if matches := log_file.grep("|".join(f"({p})" for p in set(self.ignore_cores_log_patterns))):
|
||||
logger.debug(f"Will ignore cores on {server}. Found the following log messages: {matches}")
|
||||
ignore_cores.append(server)
|
||||
critical_error_pattern = r"Assertion.*failed|AddressSanitizer"
|
||||
if server not in ignore_cores:
|
||||
critical_error_pattern += "|Aborting on shard"
|
||||
if found_critical := await log_file.grep(critical_error_pattern):
|
||||
errors[server]["critical"] = [e[0] for e in found_critical]
|
||||
# Find the backtraces for the critical errors
|
||||
if found_backtraces := await log_file.find_backtraces():
|
||||
errors[server]["backtraces"] = found_backtraces
|
||||
if check_all_errors:
|
||||
if found_errors := await log_file.grep_for_errors(distinct_errors=True):
|
||||
if filtered_errors := await self.filter_errors(found_errors):
|
||||
errors[server]["error"] = filtered_errors
|
||||
# find core files
|
||||
for server, cores in (await self.find_cores()).items():
|
||||
errors[server]["cores"] = cores
|
||||
# add log file path to the report for servers that had errors or cores
|
||||
for server in await self.all_servers():
|
||||
log_file = await self.server_open_log(server_id=server.server_id)
|
||||
if server in errors:
|
||||
errors[server]["log"] = log_file.file.name
|
||||
|
||||
return errors
|
||||
|
||||
async def filter_errors(self, errors: list[str]):
|
||||
exclude_errors_pattern = re.compile("|".join(f"{p}" for p in {
|
||||
*self.ignore_log_patterns,
|
||||
*self.ignore_cores_log_patterns,
|
||||
|
||||
r"Compaction for .* deliberately stopped",
|
||||
r"update compaction history failed:.*ignored",
|
||||
|
||||
# We may stop nodes that have not finished starting yet.
|
||||
r"(Startup|start) failed:.*(seastar::sleep_aborted|raft::request_aborted)",
|
||||
r"Timer callback failed: seastar::gate_closed_exception",
|
||||
|
||||
# Ignore expected RPC errors when nodes are stopped.
|
||||
r"rpc - client .*(connection dropped|fail to connect)",
|
||||
|
||||
# We see benign RPC errors when nodes start/stop.
|
||||
# If they cause system malfunction, it should be detected using higher-level tests.
|
||||
r"rpc::unknown_verb_error",
|
||||
r"raft_rpc - Failed to send",
|
||||
r"raft_topology.*(seastar::broken_promise|rpc::closed_error)",
|
||||
|
||||
# Expected tablet migration stream failure where a node is stopped.
|
||||
# Refs: https://github.com/scylladb/scylladb/issues/19640
|
||||
r"Failed to handle STREAM_MUTATION_FRAGMENTS.*rpc::stream_closed",
|
||||
|
||||
# Expected Raft errors on decommission-abort or node restart with MV.
|
||||
r"raft_topology - raft_topology_cmd.*failed with: raft::request_aborted",
|
||||
}))
|
||||
return [e for e in errors if not exclude_errors_pattern.search(e)]
|
||||
|
||||
async def find_cores(self) -> dict[ServerInfo, list[str]]:
|
||||
"""Find core files on all servers"""
|
||||
# find *.core files in current dir
|
||||
cores = [str(core_file.absolute()) for core_file in pathlib.Path('.').glob('*.core')]
|
||||
server_cores = dict()
|
||||
# match core files to servers by pid
|
||||
for server in await self.all_servers():
|
||||
if found_cores := [core for core in cores if f".{server.pid}." in core]:
|
||||
server_cores[server] = found_cores
|
||||
return server_cores
|
||||
|
||||
async def gather_related_logs(self, failed_test_path_dir: Path, logs: Dict[str, Path]) -> None:
|
||||
for server in await self.all_servers():
|
||||
@@ -292,7 +212,8 @@ class ManagerClient:
|
||||
except RuntimeError as exc:
|
||||
raise Exception("Failed to get list of running servers") from exc
|
||||
assert isinstance(server_info_list, list), "running_servers got unknown data type"
|
||||
return [ServerInfo(*info) for info in server_info_list]
|
||||
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
|
||||
for info in server_info_list]
|
||||
|
||||
async def all_servers(self) -> list[ServerInfo]:
|
||||
"""Get List of server info (id and IP address) of all servers"""
|
||||
@@ -301,7 +222,8 @@ class ManagerClient:
|
||||
except RuntimeError as exc:
|
||||
raise Exception("Failed to get list of servers") from exc
|
||||
assert isinstance(server_info_list, list), "all_servers got unknown data type"
|
||||
return [ServerInfo(*info) for info in server_info_list]
|
||||
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
|
||||
for info in server_info_list]
|
||||
|
||||
async def starting_servers(self) -> list[ServerInfo]:
|
||||
"""Get List of server info (id and IP address) of servers currently
|
||||
@@ -314,7 +236,8 @@ class ManagerClient:
|
||||
except RuntimeError as exc:
|
||||
raise Exception("Failed to get list of starting servers") from exc
|
||||
assert isinstance(server_info_list, list), "starting_servers got unknown data type"
|
||||
return [ServerInfo(*info) for info in server_info_list]
|
||||
return [ServerInfo(ServerNum(int(info[0])), IPAddress(info[1]), IPAddress(info[2]), info[3], info[4])
|
||||
for info in server_info_list]
|
||||
|
||||
async def mark_dirty(self) -> None:
|
||||
"""Manually mark current cluster dirty.
|
||||
@@ -353,9 +276,6 @@ class ManagerClient:
|
||||
Replace CLI options and environment variables with `cmdline_options_override` and `append_env_override`
|
||||
if provided.
|
||||
"""
|
||||
if expected_error is not None:
|
||||
self.ignore_log_patterns.append(re.escape(expected_error))
|
||||
|
||||
logger.debug("ManagerClient starting %s", server_id)
|
||||
data = {
|
||||
"expected_error": expected_error,
|
||||
@@ -492,9 +412,6 @@ class ManagerClient:
|
||||
expected_server_up_state: Optional[ServerUpState] = None,
|
||||
connect_driver: bool = True) -> ServerInfo:
|
||||
"""Add a new server"""
|
||||
if expected_error is not None:
|
||||
self.ignore_log_patterns.append(re.escape(expected_error))
|
||||
|
||||
try:
|
||||
data = self._create_server_add_data(
|
||||
replace_cfg,
|
||||
@@ -523,7 +440,11 @@ class ManagerClient:
|
||||
except Exception as exc:
|
||||
raise Exception("Failed to add server") from exc
|
||||
try:
|
||||
s_info = ServerInfo(**server_info)
|
||||
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
|
||||
IPAddress(server_info["ip_addr"]),
|
||||
IPAddress(server_info["rpc_address"]),
|
||||
server_info["datacenter"],
|
||||
server_info["rack"])
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"server_add got invalid server data {server_info}") from exc
|
||||
logger.debug("ManagerClient added %s", s_info)
|
||||
@@ -552,9 +473,6 @@ class ManagerClient:
|
||||
assert servers_num > 0, f"servers_add: cannot add {servers_num} servers, servers_num must be positive"
|
||||
assert not (property_file and auto_rack_dc), f"Either property_file or auto_rack_dc can be provided, but not both"
|
||||
|
||||
if expected_error is not None:
|
||||
self.ignore_log_patterns.append(re.escape(expected_error))
|
||||
|
||||
if auto_rack_dc:
|
||||
property_file = [{"dc":auto_rack_dc, "rack":f"rack{i+1}"} for i in range(servers_num)]
|
||||
|
||||
@@ -571,7 +489,11 @@ class ManagerClient:
|
||||
s_infos = list[ServerInfo]()
|
||||
for server_info in server_infos:
|
||||
try:
|
||||
s_info = ServerInfo(**server_info)
|
||||
s_info = ServerInfo(ServerNum(int(server_info["server_id"])),
|
||||
IPAddress(server_info["ip_addr"]),
|
||||
IPAddress(server_info["rpc_address"]),
|
||||
server_info["datacenter"],
|
||||
server_info["rack"])
|
||||
s_infos.append(s_info)
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"servers_add got invalid server data {server_info}") from exc
|
||||
@@ -590,9 +512,6 @@ class ManagerClient:
|
||||
wait_removed_dead: bool = True,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
|
||||
"""Invoke remove node Scylla REST API for a specified server"""
|
||||
if expected_error is not None:
|
||||
self.ignore_log_patterns.append(re.escape(expected_error))
|
||||
|
||||
logger.debug("ManagerClient remove node %s on initiator %s", server_id, initiator_id)
|
||||
|
||||
# If we remove a node, we should wait until other nodes see it as dead
|
||||
@@ -613,9 +532,6 @@ class ManagerClient:
|
||||
expected_error: str | None = None,
|
||||
timeout: Optional[float] = ScyllaServer.TOPOLOGY_TIMEOUT) -> None:
|
||||
"""Tell a node to decommission with Scylla REST API"""
|
||||
if expected_error is not None:
|
||||
self.ignore_log_patterns.append(re.escape(expected_error))
|
||||
|
||||
logger.debug("ManagerClient decommission %s", server_id)
|
||||
data = {"expected_error": expected_error}
|
||||
await self.client.put_json(f"/cluster/decommission-node/{server_id}", data,
|
||||
|
||||
@@ -477,8 +477,7 @@ class ScyllaServer:
|
||||
return "DEFAULT_RACK"
|
||||
|
||||
def server_info(self) -> ServerInfo:
|
||||
pid = self.cmd.pid if self.cmd else None
|
||||
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack, pid)
|
||||
return ServerInfo(self.server_id, self.ip_addr, self.rpc_address, self.datacenter, self.rack)
|
||||
|
||||
def change_rpc_address(self, rpc_address: IPAddress) -> None:
|
||||
"""Change RPC IP address of the current server. Pre: the server is
|
||||
|
||||
@@ -28,7 +28,7 @@ def write_generator(table, size_in_kb: int):
|
||||
yield f"INSERT INTO {table} (pk, t) VALUES ({idx}, '{'x' * 1020}')"
|
||||
|
||||
|
||||
class random_content_file:
|
||||
class RandomContentFile:
|
||||
def __init__(self, path: str, size_in_bytes: int):
|
||||
path = pathlib.Path(path)
|
||||
self.filename = path if path.is_file() else path / str(uuid.uuid4())
|
||||
@@ -68,7 +68,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -95,7 +95,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
assert await log.grep("database - Set critical disk utilization mode: false", from_mark=mark) == []
|
||||
|
||||
try:
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=host[0], execution_profile=cl_one_profile).result()
|
||||
cql.execute(f"INSERT INTO {cf} (pk, t) VALUES (-1, 'x')", host=hosts[0], execution_profile=cl_one_profile).result()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
@@ -111,7 +111,7 @@ async def test_user_writes_rejection(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
async def test_autotoggle_compaction(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
cmdline = [*global_cmdline,
|
||||
"--logger-log-level", "compaction=debug"]
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=cmdline) as servers:
|
||||
@@ -134,7 +134,7 @@ async def test_autotoogle_compaction(manager: ManagerClient, volumes_factory: Ca
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -175,7 +175,7 @@ async def test_reject_split_compaction(manager: ManagerClient, volumes_factory:
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
await log.wait_for(f"Split task .* for table {cf} .* stopped, reason: Compaction for {cf} was stopped due to: drain")
|
||||
|
||||
|
||||
@@ -198,7 +198,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
s1_mark, _ = await s1_log.wait_for("compaction_manager - Drained", from_mark=s1_mark)
|
||||
|
||||
@@ -206,7 +206,7 @@ async def test_split_compaction_not_triggered(manager: ManagerClient, volumes_fa
|
||||
s2_mark = await s2_log.mark()
|
||||
cql.execute_async(f"ALTER KEYSPACE {ks} WITH tablets = {{'initial': 32}}")
|
||||
|
||||
s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
await s2_log.wait_for(f"compaction .* Split {cf}", from_mark=s2_mark)
|
||||
assert await s1_log.grep(f"compaction .* Split {cf}", from_mark=s1_mark) == []
|
||||
|
||||
|
||||
@@ -236,7 +236,7 @@ async def test_tablet_repair(manager: ManagerClient, volumes_factory: Callable)
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("repair - Drained", from_mark=mark)
|
||||
|
||||
@@ -315,7 +315,7 @@ async def test_autotoogle_reject_incoming_migrations(manager: ManagerClient, vol
|
||||
mark = await log.mark()
|
||||
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("database - Set critical disk utilization mode: true", from_mark=mark)
|
||||
|
||||
@@ -371,7 +371,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
with RandomContentFile(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
@@ -382,7 +382,7 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 2}};")
|
||||
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
await coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
|
||||
await manager.server_restart(servers[0].server_id, wait_others=2)
|
||||
|
||||
@@ -397,80 +397,3 @@ async def test_node_restart_while_tablet_split(manager: ManagerClient, volumes_f
|
||||
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
|
||||
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}, increasing from 1 to 2 tablets", from_mark=mark)
|
||||
await assert_resize_task_info(table_id, lambda response: len(response) == 2 and all(r.resize_task_info is None for r in response))
|
||||
|
||||
# Verify that new sstable produced by repair cannot be split, if disk utilization level is critical.
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
async def test_repair_failure_on_split_rejection(manager: ManagerClient, volumes_factory: Callable) -> None:
|
||||
cfg = {
|
||||
'tablet_load_stats_refresh_interval_in_seconds': 1,
|
||||
}
|
||||
async with space_limited_servers(manager, volumes_factory, ["100M"]*3, cmdline=global_cmdline, config=cfg) as servers:
|
||||
cql, _ = await manager.get_ready_cql(servers)
|
||||
workdir = await manager.server_get_workdir(servers[0].server_id)
|
||||
log = await manager.server_open_log(servers[0].server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
logger.info("Create and populate test table")
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 2}") as ks:
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY, t text") as cf:
|
||||
table = cf.split('.')[-1]
|
||||
table_id = (await cql.run_async(f"SELECT id FROM system_schema.tables WHERE keyspace_name = '{ks}' AND table_name = '{table}'"))[0].id
|
||||
|
||||
await asyncio.gather(*[cql.run_async(query) for query in write_generator(cf, 64)])
|
||||
await manager.api.flush_keyspace(servers[0].ip_addr, ks)
|
||||
|
||||
coord = await get_topology_coordinator(manager)
|
||||
coord_serv = await find_server_by_host_id(manager, servers, coord)
|
||||
coord_log = await manager.server_open_log(coord_serv.server_id)
|
||||
|
||||
async def run_split():
|
||||
await manager.api.enable_injection(coord_serv.ip_addr, 'tablet_resize_finalization_postpone', one_shot=False)
|
||||
|
||||
# force split on the test table
|
||||
await cql.run_async(f"ALTER TABLE {cf} WITH tablets = {{'min_tablet_count': 4}}")
|
||||
|
||||
coord_log.wait_for(f"Generating resize decision for table {table_id} of type split")
|
||||
|
||||
async def generate_repair_work():
|
||||
insert_stmt = cql.prepare(f"INSERT INTO {cf} (pk, t) VALUES (?, ?)")
|
||||
insert_stmt.consistency_level = ConsistencyLevel.ONE
|
||||
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "database_apply", one_shot=False)
|
||||
pks = range(256, 512)
|
||||
await asyncio.gather(*[cql.run_async(insert_stmt, (k, f'{k}')) for k in pks])
|
||||
await manager.api.disable_injection(servers[0].ip_addr, "database_apply")
|
||||
|
||||
await generate_repair_work()
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait", one_shot=True)
|
||||
|
||||
token = 'all'
|
||||
repair_task = asyncio.create_task(manager.api.tablet_repair(servers[0].ip_addr, ks, table, token))
|
||||
|
||||
# Emit split decision during repair.
|
||||
await run_split()
|
||||
|
||||
await log.wait_for("maybe_split_new_sstable_wait: waiting", from_mark=mark)
|
||||
await manager.api.disable_injection(coord_serv.ip_addr, "tablet_resize_finalization_postpone")
|
||||
|
||||
logger.info("Create a big file on the target node to reach critical disk utilization level")
|
||||
disk_info = psutil.disk_usage(workdir)
|
||||
with random_content_file(workdir, int(disk_info.total*0.85) - disk_info.used):
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Drained", from_mark=mark)
|
||||
|
||||
await manager.api.message_injection(servers[0].ip_addr, "maybe_split_new_sstable_wait")
|
||||
|
||||
# Expect repair to fail when splitting new sstables
|
||||
await log.wait_for("Repair for tablet migration of .* failed", from_mark=mark)
|
||||
await log.wait_for("Cannot split .* because manager has compaction disabled", from_mark=mark)
|
||||
|
||||
assert await log.grep(f"compaction .* Split {cf}", from_mark=mark) == []
|
||||
|
||||
logger.info("With blob file removed, wait for DB to drop below the critical disk utilization level")
|
||||
for _ in range(2):
|
||||
mark, _ = await log.wait_for("compaction_manager - Enabled", from_mark=mark)
|
||||
|
||||
await repair_task
|
||||
|
||||
mark, _ = await log.wait_for(f"Detected tablet split for table {cf}", from_mark=mark)
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
## Copyright 2025-present ScyllaDB
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
# This file is generated by cargo-toml-template. Do not edit directly.
|
||||
# To make changes, edit the template and regenerate with command:
|
||||
# "$ cargo-toml-template > Cargo.toml".
|
||||
|
||||
[workspace]
|
||||
members = ["crates/*"]
|
||||
default-members = ["crates/validator"]
|
||||
@@ -16,16 +12,13 @@ edition = "2024"
|
||||
|
||||
[workspace.dependencies]
|
||||
anyhow = "1.0.97"
|
||||
async-backtrace = "0.2.7"
|
||||
futures = "0.3.31"
|
||||
scylla = { version = "1.2.0", features = ["time-03"] }
|
||||
tokio = { version = "1.44.1", features = ["full"] }
|
||||
tracing = "0.1.41"
|
||||
uuid = "1.16.0"
|
||||
httpclient = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
|
||||
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
|
||||
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
|
||||
vector-store = { git = "https://github.com/scylladb/vector-store.git", rev = "d79ee80" }
|
||||
vector-search-validator-engine = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
|
||||
vector-search-validator-tests = { git = "https://github.com/scylladb/vector-store.git", rev = "3ee46a5" }
|
||||
|
||||
[patch.'https://github.com/scylladb/scylladb.git']
|
||||
[patch.'https://github.com/scylladb/vector-store.git']
|
||||
vector-search-validator-scylla = { path = "crates/validator-scylla" }
|
||||
|
||||
@@ -8,8 +8,6 @@ namespace to separate it from the host environment. `vector-search-validator`
|
||||
contains DNS server and all tests in one binary. It uses external scylla and
|
||||
vector-store binaries.
|
||||
|
||||
## Running tests
|
||||
|
||||
The `test_validator.py::test_validator[test-case]` is the entry point for
|
||||
running the tests. It is parametrized with name of the test case. Available
|
||||
test cases are taken dynamically from the `vector-search-validator` binary.
|
||||
@@ -39,22 +37,6 @@ $ pytest --mode=dev test/vector_search_validator/test_validator.py --filters fil
|
||||
Logs are stored in
|
||||
`testlog/{mode}/vector_search_validator/{test-case}-{run_id}/` directory.
|
||||
|
||||
## Development of test cases
|
||||
|
||||
`vector-search-validator` (in short `validator`) is divided into multiple
|
||||
crates:
|
||||
- `validator` - a main crate that contains only the entry point
|
||||
- `validator-scylla` - contains implementation of the validator tests on the
|
||||
scylladb.git side. If you want to add/modify the tests implemented in the
|
||||
scylladb.git, you will work in this crate.
|
||||
- `vector-store.git/validator-engine` - contains the core logic of the
|
||||
validator - overall test runner and implementation of actors for tests (dns
|
||||
server, scylla cluster, vector store cluster)
|
||||
- `vector-store.git/validator-tests` - contains the core logic of the framework
|
||||
tests, provides base structures for tests and actor interfaces. In the
|
||||
future we should check if it is possible to merge it with `validator-engine`
|
||||
crate.
|
||||
- `vector-store.git/validator-vector-store` - contains implementation of the
|
||||
validator tests on the vector-store.git side. If you want to add/modify the
|
||||
tests implemented in the vector-store.git, you will work in this crate.
|
||||
Implementing new test cases on the Scylla repository side means adding new test
|
||||
in crate `crates/validator-scylla`.
|
||||
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
VECTOR_STORE_GIT=https://github.com/scylladb/vector-store.git
|
||||
VECTOR_STORE_REV=d79ee80
|
||||
VECTOR_STORE_REV=3ee46a5
|
||||
|
||||
@@ -11,10 +11,6 @@ cat << EOF
|
||||
## Copyright 2025-present ScyllaDB
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
|
||||
# This file is generated by cargo-toml-template. Do not edit directly.
|
||||
# To make changes, edit the template and regenerate with command:
|
||||
# "\$ cargo-toml-template > Cargo.toml".
|
||||
|
||||
[workspace]
|
||||
members = ["crates/*"]
|
||||
default-members = ["crates/validator"]
|
||||
@@ -26,17 +22,14 @@ edition = "2024"
|
||||
|
||||
[workspace.dependencies]
|
||||
anyhow = "1.0.97"
|
||||
async-backtrace = "0.2.7"
|
||||
futures = "0.3.31"
|
||||
scylla = { version = "1.2.0", features = ["time-03"] }
|
||||
tokio = { version = "1.44.1", features = ["full"] }
|
||||
tracing = "0.1.41"
|
||||
uuid = "1.16.0"
|
||||
httpclient = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
|
||||
vector-search-validator-engine = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
|
||||
vector-search-validator-tests = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
|
||||
vector-store = { git = "$VECTOR_STORE_GIT", rev = "$VECTOR_STORE_REV" }
|
||||
|
||||
[patch.'https://github.com/scylladb/scylladb.git']
|
||||
[patch.'$VECTOR_STORE_GIT']
|
||||
vector-search-validator-scylla = { path = "crates/validator-scylla" }
|
||||
EOF
|
||||
|
||||
@@ -4,11 +4,5 @@ version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
async-backtrace.workspace = true
|
||||
httpclient.workspace = true
|
||||
scylla.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
uuid.workspace = true
|
||||
vector-search-validator-tests.workspace = true
|
||||
vector-store.workspace = true
|
||||
|
||||
@@ -1,302 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
use async_backtrace::framed;
|
||||
use httpclient::HttpClient;
|
||||
use scylla::client::session::Session;
|
||||
use scylla::client::session_builder::SessionBuilder;
|
||||
use scylla::response::query_result::QueryRowsResult;
|
||||
use std::collections::HashMap;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
use vector_search_validator_tests::DnsExt;
|
||||
use vector_search_validator_tests::ScyllaClusterExt;
|
||||
use vector_search_validator_tests::ScyllaNodeConfig;
|
||||
use vector_search_validator_tests::TestActors;
|
||||
use vector_search_validator_tests::VectorStoreClusterExt;
|
||||
use vector_search_validator_tests::VectorStoreNodeConfig;
|
||||
use vector_store::httproutes::IndexStatus;
|
||||
use vector_store::IndexInfo;
|
||||
|
||||
pub(crate) const DEFAULT_TEST_TIMEOUT: Duration = Duration::from_secs(120);
|
||||
|
||||
pub(crate) const VS_NAMES: [&str; 3] = ["vs1", "vs2", "vs3"];
|
||||
|
||||
pub(crate) const VS_PORT: u16 = 6080;
|
||||
|
||||
pub(crate) const DB_OCTET_1: u8 = 1;
|
||||
pub(crate) const DB_OCTET_2: u8 = 2;
|
||||
pub(crate) const DB_OCTET_3: u8 = 3;
|
||||
pub(crate) const VS_OCTET_1: u8 = 128;
|
||||
pub(crate) const VS_OCTET_2: u8 = 129;
|
||||
pub(crate) const VS_OCTET_3: u8 = 130;
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn get_default_vs_urls(actors: &TestActors) -> Vec<String> {
|
||||
let domain = actors.dns.domain().await;
|
||||
VS_NAMES
|
||||
.iter()
|
||||
.map(|name| format!("http://{name}.{domain}:{VS_PORT}"))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_vs_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
|
||||
vec![
|
||||
actors.services_subnet.ip(VS_OCTET_1),
|
||||
actors.services_subnet.ip(VS_OCTET_2),
|
||||
actors.services_subnet.ip(VS_OCTET_3),
|
||||
]
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_db_ips(actors: &TestActors) -> Vec<Ipv4Addr> {
|
||||
vec![
|
||||
actors.services_subnet.ip(DB_OCTET_1),
|
||||
actors.services_subnet.ip(DB_OCTET_2),
|
||||
actors.services_subnet.ip(DB_OCTET_3),
|
||||
]
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn get_default_scylla_node_configs(actors: &TestActors) -> Vec<ScyllaNodeConfig> {
|
||||
let default_vs_urls = get_default_vs_urls(actors).await;
|
||||
get_default_db_ips(actors)
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, &ip)| {
|
||||
let mut vs_urls = default_vs_urls.clone();
|
||||
ScyllaNodeConfig {
|
||||
db_ip: ip,
|
||||
primary_vs_uris: vec![vs_urls.remove(i)],
|
||||
secondary_vs_uris: vs_urls,
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn get_default_vs_node_configs(actors: &TestActors) -> Vec<VectorStoreNodeConfig> {
|
||||
let db_ips = get_default_db_ips(actors);
|
||||
get_default_vs_ips(actors)
|
||||
.iter()
|
||||
.zip(db_ips.iter())
|
||||
.map(|(&vs_ip, &db_ip)| VectorStoreNodeConfig {
|
||||
vs_ip,
|
||||
db_ip,
|
||||
envs: HashMap::new(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn init(actors: TestActors) {
|
||||
info!("started");
|
||||
|
||||
let scylla_configs = get_default_scylla_node_configs(&actors).await;
|
||||
let vs_configs = get_default_vs_node_configs(&actors);
|
||||
init_with_config(actors, scylla_configs, vs_configs).await;
|
||||
|
||||
info!("finished");
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn init_with_config(
|
||||
actors: TestActors,
|
||||
scylla_configs: Vec<ScyllaNodeConfig>,
|
||||
vs_configs: Vec<VectorStoreNodeConfig>,
|
||||
) {
|
||||
let vs_ips = get_default_vs_ips(&actors);
|
||||
for (name, ip) in VS_NAMES.iter().zip(vs_ips.iter()) {
|
||||
actors.dns.upsert(name.to_string(), *ip).await;
|
||||
}
|
||||
|
||||
actors.db.start(scylla_configs, None).await;
|
||||
assert!(actors.db.wait_for_ready().await);
|
||||
actors.vs.start(vs_configs).await;
|
||||
assert!(actors.vs.wait_for_ready().await);
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn cleanup(actors: TestActors) {
|
||||
info!("started");
|
||||
for name in VS_NAMES.iter() {
|
||||
actors.dns.remove(name.to_string()).await;
|
||||
}
|
||||
actors.vs.stop().await;
|
||||
actors.db.stop().await;
|
||||
info!("finished");
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn prepare_connection_with_custom_vs_ips(
|
||||
actors: &TestActors,
|
||||
vs_ips: Vec<Ipv4Addr>,
|
||||
) -> (Arc<Session>, Vec<HttpClient>) {
|
||||
let session = Arc::new(
|
||||
SessionBuilder::new()
|
||||
.known_node(actors.services_subnet.ip(DB_OCTET_1).to_string())
|
||||
.build()
|
||||
.await
|
||||
.expect("failed to create session"),
|
||||
);
|
||||
let clients = vs_ips
|
||||
.iter()
|
||||
.map(|&ip| HttpClient::new((ip, VS_PORT).into()))
|
||||
.collect();
|
||||
(session, clients)
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn wait_for<F, Fut>(mut condition: F, msg: &str, timeout: Duration)
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = bool>,
|
||||
{
|
||||
time::timeout(timeout, async {
|
||||
while !condition().await {
|
||||
time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn wait_for_value<F, Fut, T>(mut poll_fn: F, msg: &str, timeout: Duration) -> T
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = Option<T>>,
|
||||
{
|
||||
time::timeout(timeout, async {
|
||||
loop {
|
||||
if let Some(value) = poll_fn().await {
|
||||
return value;
|
||||
}
|
||||
time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|_| panic!("Timeout on: {msg}"))
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn wait_for_index(
|
||||
client: &HttpClient,
|
||||
index: &IndexInfo,
|
||||
) -> vector_store::httproutes::IndexStatusResponse {
|
||||
wait_for_value(
|
||||
|| async {
|
||||
match client.index_status(&index.keyspace, &index.index).await {
|
||||
Ok(resp) if resp.status == IndexStatus::Serving => Some(resp),
|
||||
_ => None,
|
||||
}
|
||||
},
|
||||
"Waiting for index to be SERVING",
|
||||
Duration::from_secs(20),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn get_query_results(query: String, session: &Session) -> QueryRowsResult {
|
||||
session
|
||||
.query_unpaged(query, ())
|
||||
.await
|
||||
.expect("failed to run query")
|
||||
.into_rows_result()
|
||||
.expect("failed to get rows")
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn create_keyspace(session: &Session) -> String {
|
||||
let keyspace = format!("ks_{}", Uuid::new_v4().simple());
|
||||
|
||||
// Create keyspace with replication factor of 3 for the 3-node cluster
|
||||
session.query_unpaged(
|
||||
format!("CREATE KEYSPACE {keyspace} WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}"),
|
||||
(),
|
||||
).await.expect("failed to create a keyspace");
|
||||
|
||||
// Use keyspace
|
||||
session
|
||||
.use_keyspace(&keyspace, false)
|
||||
.await
|
||||
.expect("failed to use a keyspace");
|
||||
|
||||
keyspace
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn create_table(
|
||||
session: &Session,
|
||||
columns: &str,
|
||||
options: Option<&str>,
|
||||
) -> String {
|
||||
let table = format!("tbl_{}", Uuid::new_v4().simple());
|
||||
|
||||
let extra = if let Some(options) = options {
|
||||
format!("WITH {options}")
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
// Create table
|
||||
session
|
||||
.query_unpaged(format!("CREATE TABLE {table} ({columns}) {extra}"), ())
|
||||
.await
|
||||
.expect("failed to create a table");
|
||||
|
||||
table
|
||||
}
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn create_index(
|
||||
session: &Session,
|
||||
clients: &[HttpClient],
|
||||
table: &str,
|
||||
column: &str,
|
||||
) -> IndexInfo {
|
||||
let index = format!("idx_{}", Uuid::new_v4().simple());
|
||||
|
||||
// Create index
|
||||
session
|
||||
.query_unpaged(
|
||||
format!("CREATE INDEX {index} ON {table}({column}) USING 'vector_index'"),
|
||||
(),
|
||||
)
|
||||
.await
|
||||
.expect("failed to create an index");
|
||||
|
||||
// Wait for the index to be created
|
||||
wait_for(
|
||||
|| async {
|
||||
for client in clients.iter() {
|
||||
if !client
|
||||
.indexes()
|
||||
.await
|
||||
.iter()
|
||||
.any(|idx| idx.index.to_string() == index)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
},
|
||||
"Waiting for the first index to be created",
|
||||
Duration::from_secs(10),
|
||||
)
|
||||
.await;
|
||||
|
||||
clients
|
||||
.first()
|
||||
.expect("No vector store clients provided")
|
||||
.indexes()
|
||||
.await
|
||||
.into_iter()
|
||||
.find(|idx| idx.index.to_string() == index)
|
||||
.expect("index not found")
|
||||
}
|
||||
@@ -3,13 +3,12 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
use crate::common;
|
||||
use async_backtrace::framed;
|
||||
use vector_search_validator_tests::TestCase;
|
||||
use std::time::Duration;
|
||||
use vector_search_validator_tests::common;
|
||||
use vector_search_validator_tests::*;
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn new() -> TestCase {
|
||||
let timeout = common::DEFAULT_TEST_TIMEOUT;
|
||||
let timeout = Duration::from_secs(30);
|
||||
TestCase::empty()
|
||||
.with_init(timeout, common::init)
|
||||
.with_cleanup(timeout, common::cleanup)
|
||||
|
||||
@@ -1,115 +0,0 @@
|
||||
/*
|
||||
* Copyright 2025-present ScyllaDB
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
use crate::common;
|
||||
use async_backtrace::framed;
|
||||
use tracing::info;
|
||||
use vector_search_validator_tests::ScyllaClusterExt;
|
||||
use vector_search_validator_tests::ScyllaNodeConfig;
|
||||
use vector_search_validator_tests::TestActors;
|
||||
use vector_search_validator_tests::TestCase;
|
||||
use vector_search_validator_tests::VectorStoreNodeConfig;
|
||||
|
||||
#[framed]
|
||||
pub(crate) async fn new() -> TestCase {
|
||||
let timeout = common::DEFAULT_TEST_TIMEOUT;
|
||||
TestCase::empty()
|
||||
.with_cleanup(timeout, common::cleanup)
|
||||
.with_test(
|
||||
"secondary_uri_works_correctly",
|
||||
timeout,
|
||||
test_secondary_uri_works_correctly,
|
||||
)
|
||||
}
|
||||
|
||||
#[framed]
|
||||
async fn test_secondary_uri_works_correctly(actors: TestActors) {
|
||||
info!("started");
|
||||
|
||||
let vs_urls = common::get_default_vs_urls(&actors).await;
|
||||
let vs_url = &vs_urls[0];
|
||||
|
||||
let scylla_configs: Vec<ScyllaNodeConfig> = vec![
|
||||
ScyllaNodeConfig {
|
||||
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
|
||||
primary_vs_uris: vec![vs_url.clone()],
|
||||
secondary_vs_uris: vec![],
|
||||
},
|
||||
ScyllaNodeConfig {
|
||||
db_ip: actors.services_subnet.ip(common::DB_OCTET_2),
|
||||
primary_vs_uris: vec![],
|
||||
secondary_vs_uris: vec![vs_url.clone()],
|
||||
},
|
||||
ScyllaNodeConfig {
|
||||
db_ip: actors.services_subnet.ip(common::DB_OCTET_3),
|
||||
primary_vs_uris: vec![],
|
||||
secondary_vs_uris: vec![vs_url.clone()],
|
||||
},
|
||||
];
|
||||
let vs_configs = vec![VectorStoreNodeConfig {
|
||||
vs_ip: actors.services_subnet.ip(common::VS_OCTET_1),
|
||||
db_ip: actors.services_subnet.ip(common::DB_OCTET_1),
|
||||
envs: Default::default(),
|
||||
}];
|
||||
common::init_with_config(actors.clone(), scylla_configs, vs_configs).await;
|
||||
|
||||
let vs_ips = vec![actors.services_subnet.ip(common::VS_OCTET_1)];
|
||||
let (session, clients) = common::prepare_connection_with_custom_vs_ips(&actors, vs_ips).await;
|
||||
|
||||
let keyspace = common::create_keyspace(&session).await;
|
||||
let table =
|
||||
common::create_table(&session, "pk INT PRIMARY KEY, v VECTOR<FLOAT, 3>", None).await;
|
||||
|
||||
// Insert vectors
|
||||
for i in 0..100 {
|
||||
let embedding = vec![i as f32, (i * 2) as f32, (i * 3) as f32];
|
||||
session
|
||||
.query_unpaged(
|
||||
format!("INSERT INTO {table} (pk, v) VALUES (?, ?)"),
|
||||
(i, &embedding),
|
||||
)
|
||||
.await
|
||||
.expect("failed to insert data");
|
||||
}
|
||||
|
||||
let index = common::create_index(&session, &clients, &table, "v").await;
|
||||
|
||||
for client in &clients {
|
||||
let index_status = common::wait_for_index(&client, &index).await;
|
||||
|
||||
assert_eq!(
|
||||
index_status.count, 100,
|
||||
"Expected 100 vectors to be indexed"
|
||||
);
|
||||
}
|
||||
|
||||
// Down the first node with primary URI
|
||||
let first_node_ip = actors.services_subnet.ip(common::DB_OCTET_1);
|
||||
info!("Bringing down node {first_node_ip}");
|
||||
actors.db.down_node(first_node_ip).await;
|
||||
|
||||
// Should work via secondary URIs
|
||||
let results = common::get_query_results(
|
||||
format!("SELECT pk FROM {table} ORDER BY v ANN OF [0.0, 0.0, 0.0] LIMIT 10"),
|
||||
&session,
|
||||
)
|
||||
.await;
|
||||
|
||||
let rows = results
|
||||
.rows::<(i32,)>()
|
||||
.expect("failed to get rows after node down");
|
||||
assert!(
|
||||
rows.rows_remaining() <= 10,
|
||||
"Expected at most 10 results from ANN query after node down"
|
||||
);
|
||||
|
||||
// Drop keyspace
|
||||
session
|
||||
.query_unpaged(format!("DROP KEYSPACE {keyspace}"), ())
|
||||
.await
|
||||
.expect("failed to drop a keyspace");
|
||||
|
||||
info!("finished");
|
||||
}
|
||||
@@ -3,19 +3,12 @@
|
||||
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
||||
*/
|
||||
|
||||
mod common;
|
||||
mod cql;
|
||||
mod high_availability;
|
||||
|
||||
use async_backtrace::framed;
|
||||
use vector_search_validator_tests::TestCase;
|
||||
|
||||
#[framed]
|
||||
pub async fn test_cases() -> impl Iterator<Item = (String, TestCase)> {
|
||||
vec![
|
||||
("scylla_cql", cql::new().await),
|
||||
("scylla_high_availability", high_availability::new().await),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(name, test_case)| (name.to_string(), test_case))
|
||||
vec![("cql", cql::new().await)]
|
||||
.into_iter()
|
||||
.map(|(name, test_case)| (name.to_string(), test_case))
|
||||
}
|
||||
|
||||
Submodule tools/cqlsh updated: 9e5a91d73a...22401228d2
@@ -819,7 +819,7 @@ public:
|
||||
virtual compaction::compaction_strategy_state& get_compaction_strategy_state() noexcept override { return _compaction_strategy_state; }
|
||||
virtual reader_permit make_compaction_reader_permit() const override { return _permit; }
|
||||
virtual sstables::sstables_manager& get_sstables_manager() noexcept override { return _sst_man; }
|
||||
virtual sstables::shared_sstable make_sstable(sstables::sstable_state) const override { return do_make_sstable(); }
|
||||
virtual sstables::shared_sstable make_sstable() const override { return do_make_sstable(); }
|
||||
virtual sstables::sstable_writer_config configure_writer(sstring origin) const override { return do_configure_writer(std::move(origin)); }
|
||||
virtual api::timestamp_type min_memtable_timestamp() const override { return api::min_timestamp; }
|
||||
virtual api::timestamp_type min_memtable_live_timestamp() const override { return api::min_timestamp; }
|
||||
@@ -909,7 +909,7 @@ void scrub_operation(schema_ptr schema, reader_permit permit, const std::vector<
|
||||
|
||||
auto compaction_descriptor = compaction::compaction_descriptor(std::move(sstables));
|
||||
compaction_descriptor.options = compaction::compaction_type_options::make_scrub(scrub_mode, compaction::compaction_type_options::scrub::quarantine_invalid_sstables::no);
|
||||
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(sstables::sstable_state::normal); };
|
||||
compaction_descriptor.creator = [&compaction_group_view] (shard_id) { return compaction_group_view.make_sstable(); };
|
||||
compaction_descriptor.replacer = [] (compaction::compaction_completion_desc) { };
|
||||
|
||||
auto compaction_data = compaction::compaction_data{};
|
||||
|
||||
@@ -1 +1 @@
|
||||
docker.io/scylladb/scylla-toolchain:fedora-43-20251217
|
||||
docker.io/scylladb/scylla-toolchain:fedora-43-20251208
|
||||
|
||||
Reference in New Issue
Block a user