diff --git a/api/storage_service.cc b/api/storage_service.cc index b723fb0339..af092bba72 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -649,10 +649,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded(*req, "exclude_current_version", false); return ctx.db.invoke_on_all([=] (replica::database& db) { + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(keyspace)); return do_for_each(column_families, [=, &db](sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_upgrade(db, cf.as_table_state(), exclude_current_version); + return cm.perform_sstable_upgrade(owned_ranges_ptr, cf.as_table_state(), exclude_current_version); }); }).then([]{ return make_ready_future(0); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 8a09f2c9ce..2da23c176b 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -11,7 +11,6 @@ #include "compaction_backlog_manager.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" -#include "replica/database.hh" #include #include #include @@ -1448,7 +1447,7 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } -future<> compaction_manager::perform_cleanup(replica::database& db, compaction::table_state& t) { +future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup; @@ -1459,9 +1458,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, compaction:: t.schema()->ks_name(), t.schema()->cf_name())); } - auto sorted_owned_ranges = make_owned_ranges_ptr(db.get_keyspace_local_ranges(t.schema()->ks_name())); - auto get_sstables = [this, &db, &t, sorted_owned_ranges] () -> future> { - return seastar::async([this, &db, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { + auto get_sstables = [this, &t, sorted_owned_ranges] () -> future> { + return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] { auto schema = t.schema(); auto sstables = std::vector{}; const auto candidates = get_candidates(t); @@ -1478,8 +1476,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, compaction:: } // Submit a table to be upgraded and wait for its termination. -future<> compaction_manager::perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version) { - auto get_sstables = [this, &db, &t, exclude_current_version] { +future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version) { + auto get_sstables = [this, &t, exclude_current_version] { std::vector tables; auto last_version = t.get_sstables_manager().get_highest_supported_format(); @@ -1502,7 +1500,6 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. - auto sorted_owned_ranges = make_owned_ranges_ptr(db.get_keyspace_local_ranges(t.schema()->ks_name())); return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(std::move(sorted_owned_ranges)), std::move(get_sstables)).discard_result(); } diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index cb6e8557e7..f92efa7b74 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -421,10 +421,10 @@ public: // Cleanup is about discarding keys that are no longer relevant for a // given sstable, e.g. after node loses part of its token range because // of a newly added node. - future<> perform_cleanup(replica::database& db, compaction::table_state& t); + future<> perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t); // Submit a table to be upgraded and wait for its termination. - future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version); + future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version); // Submit a table to be scrubbed and wait for its termination. future perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 83f7f7afa8..77f31c06e4 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -1028,12 +1028,16 @@ SEASTAR_THREAD_TEST_CASE(max_result_size_for_unlimited_query_selection_test) { // Refs: #9494 (https://github.com/scylladb/scylla/issues/9494) SEASTAR_TEST_CASE(upgrade_sstables) { return do_with_cql_env_thread([] (cql_test_env& e) { - e.db().invoke_on_all([&e] (replica::database& db) { + e.db().invoke_on_all([&e] (replica::database& db) -> future<> { auto& cm = db.get_compaction_manager(); - return do_for_each(db.get_column_families(), [&] (std::pair> t) { - constexpr bool exclude_current_version = false; - return cm.perform_sstable_upgrade(db, t.second->as_table_state(), exclude_current_version); - }); + for (auto& [ks_name, ks] : db.get_keyspaces()) { + auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name)); + for (auto& [cf_name, schema] : ks.metadata()->cf_meta_data()) { + auto& t = db.find_column_family(schema->id()); + constexpr bool exclude_current_version = false; + co_await cm.perform_sstable_upgrade(owned_ranges_ptr, t.as_table_state(), exclude_current_version); + } + } }).get(); }); } diff --git a/test/rest_api/test_storage_service.py b/test/rest_api/test_storage_service.py index c0635e6927..c3be2d0965 100644 --- a/test/rest_api/test_storage_service.py +++ b/test/rest_api/test_storage_service.py @@ -410,3 +410,64 @@ def test_describe_ring(cql, this_dc, rest_api): with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: resp = rest_api.send("GET", f"storage_service/describe_ring/{keyspace}") resp.raise_for_status() + +def test_storage_service_keyspace_cleanup(cql, this_dc, rest_api): + with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: + schema = 'p int, v text, primary key (p)' + with new_test_table(cql, keyspace, schema) as t0: + stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)") + cql.execute(stmt, [0, 'hello']) + + with new_test_table(cql, keyspace, schema) as t1: + stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)") + cql.execute(stmt, [1, 'world']) + + test_tables = [t0.split('.')[1], t1.split('.')[1]] + + resp = rest_api.send("POST", f"storage_service/keyspace_flush/{keyspace}") + resp.raise_for_status() + + resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}") + resp.raise_for_status() + + resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[1]}" }) + resp.raise_for_status() + + resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" }) + resp.raise_for_status() + + # non-existing table + resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[0]},XXX" }) + assert resp.status_code == requests.codes.bad_request + +def test_storage_service_keyspace_upgrade_sstables(cql, this_dc, rest_api): + with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: + schema = 'p int, v text, primary key (p)' + with new_test_table(cql, keyspace, schema) as t0: + stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)") + cql.execute(stmt, [0, 'hello']) + + with new_test_table(cql, keyspace, schema) as t1: + stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)") + cql.execute(stmt, [1, 'world']) + + test_tables = [t0.split('.')[1], t1.split('.')[1]] + + resp = rest_api.send("POST", f"storage_service/keyspace_flush/{keyspace}") + resp.raise_for_status() + + resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}") + resp.raise_for_status() + + resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "exclude_current_version": "true" }) + resp.raise_for_status() + + resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[1]}" }) + resp.raise_for_status() + + resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" }) + resp.raise_for_status() + + # non-existing table + resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[0]},XXX" }) + assert resp.status_code == requests.codes.bad_request