compaction_manager: perform_cleanup, perform_sstable_upgrade: use a lw_shared_ptr for owned token ranges

And completely get rid of the dependency on replica::database.

Also, add respective rest_api tests.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2022-07-21 19:14:36 +03:00
parent e1fe598760
commit 14faa3b6f4
5 changed files with 81 additions and 17 deletions

View File

@@ -649,10 +649,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return db.find_column_family(id).get_stats().live_disk_space_used;
});
auto& cm = db.get_compaction_manager();
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(keyspace));
// as a table can be dropped during loop below, let's find it before issuing the cleanup request.
for (auto& id : table_ids) {
replica::table& t = db.find_column_family(id);
co_await cm.perform_cleanup(db, t.as_table_state());
co_await cm.perform_cleanup(owned_ranges_ptr, t.as_table_state());
}
co_return;
}).then([]{
@@ -676,10 +677,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
return ctx.db.invoke_on_all([=] (replica::database& db) {
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(keyspace));
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_upgrade(db, cf.as_table_state(), exclude_current_version);
return cm.perform_sstable_upgrade(owned_ranges_ptr, cf.as_table_state(), exclude_current_version);
});
}).then([]{
return make_ready_future<json::json_return_type>(0);

View File

@@ -11,7 +11,6 @@
#include "compaction_backlog_manager.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
#include "replica/database.hh"
#include <seastar/core/metrics.hh>
#include <seastar/core/coroutine.hh>
#include <seastar/coroutine/switch_to.hh>
@@ -1448,7 +1447,7 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
return true;
}
future<> compaction_manager::perform_cleanup(replica::database& db, compaction::table_state& t) {
future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t) {
auto check_for_cleanup = [this, &t] {
return boost::algorithm::any_of(_tasks, [&t] (auto& task) {
return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup;
@@ -1459,9 +1458,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, compaction::
t.schema()->ks_name(), t.schema()->cf_name()));
}
auto sorted_owned_ranges = make_owned_ranges_ptr(db.get_keyspace_local_ranges(t.schema()->ks_name()));
auto get_sstables = [this, &db, &t, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &db, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto get_sstables = [this, &t, sorted_owned_ranges] () -> future<std::vector<sstables::shared_sstable>> {
return seastar::async([this, &t, sorted_owned_ranges = std::move(sorted_owned_ranges)] {
auto schema = t.schema();
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = get_candidates(t);
@@ -1478,8 +1476,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, compaction::
}
// Submit a table to be upgraded and wait for its termination.
future<> compaction_manager::perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version) {
auto get_sstables = [this, &db, &t, exclude_current_version] {
future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version) {
auto get_sstables = [this, &t, exclude_current_version] {
std::vector<sstables::shared_sstable> tables;
auto last_version = t.get_sstables_manager().get_highest_supported_format();
@@ -1502,7 +1500,6 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp
// Note that we potentially could be doing multiple
// upgrades here in parallel, but that is really the users
// problem.
auto sorted_owned_ranges = make_owned_ranges_ptr(db.get_keyspace_local_ranges(t.schema()->ks_name()));
return rewrite_sstables(t, sstables::compaction_type_options::make_upgrade(std::move(sorted_owned_ranges)), std::move(get_sstables)).discard_result();
}

View File

@@ -421,10 +421,10 @@ public:
// Cleanup is about discarding keys that are no longer relevant for a
// given sstable, e.g. after node loses part of its token range because
// of a newly added node.
future<> perform_cleanup(replica::database& db, compaction::table_state& t);
future<> perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t);
// Submit a table to be upgraded and wait for its termination.
future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version);
future<> perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version);
// Submit a table to be scrubbed and wait for its termination.
future<compaction_stats_opt> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts);

View File

@@ -1028,12 +1028,16 @@ SEASTAR_THREAD_TEST_CASE(max_result_size_for_unlimited_query_selection_test) {
// Refs: #9494 (https://github.com/scylladb/scylla/issues/9494)
SEASTAR_TEST_CASE(upgrade_sstables) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.db().invoke_on_all([&e] (replica::database& db) {
e.db().invoke_on_all([&e] (replica::database& db) -> future<> {
auto& cm = db.get_compaction_manager();
return do_for_each(db.get_column_families(), [&] (std::pair<utils::UUID, lw_shared_ptr<replica::column_family>> t) {
constexpr bool exclude_current_version = false;
return cm.perform_sstable_upgrade(db, t.second->as_table_state(), exclude_current_version);
});
for (auto& [ks_name, ks] : db.get_keyspaces()) {
auto owned_ranges_ptr = compaction::make_owned_ranges_ptr(db.get_keyspace_local_ranges(ks_name));
for (auto& [cf_name, schema] : ks.metadata()->cf_meta_data()) {
auto& t = db.find_column_family(schema->id());
constexpr bool exclude_current_version = false;
co_await cm.perform_sstable_upgrade(owned_ranges_ptr, t.as_table_state(), exclude_current_version);
}
}
}).get();
});
}

View File

@@ -410,3 +410,64 @@ def test_describe_ring(cql, this_dc, rest_api):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
resp = rest_api.send("GET", f"storage_service/describe_ring/{keyspace}")
resp.raise_for_status()
def test_storage_service_keyspace_cleanup(cql, this_dc, rest_api):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
schema = 'p int, v text, primary key (p)'
with new_test_table(cql, keyspace, schema) as t0:
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
cql.execute(stmt, [0, 'hello'])
with new_test_table(cql, keyspace, schema) as t1:
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
cql.execute(stmt, [1, 'world'])
test_tables = [t0.split('.')[1], t1.split('.')[1]]
resp = rest_api.send("POST", f"storage_service/keyspace_flush/{keyspace}")
resp.raise_for_status()
resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}")
resp.raise_for_status()
resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[1]}" })
resp.raise_for_status()
resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" })
resp.raise_for_status()
# non-existing table
resp = rest_api.send("POST", f"storage_service/keyspace_cleanup/{keyspace}", { "cf": f"{test_tables[0]},XXX" })
assert resp.status_code == requests.codes.bad_request
def test_storage_service_keyspace_upgrade_sstables(cql, this_dc, rest_api):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
schema = 'p int, v text, primary key (p)'
with new_test_table(cql, keyspace, schema) as t0:
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
cql.execute(stmt, [0, 'hello'])
with new_test_table(cql, keyspace, schema) as t1:
stmt = cql.prepare(f"INSERT INTO {t1} (p, v) VALUES (?, ?)")
cql.execute(stmt, [1, 'world'])
test_tables = [t0.split('.')[1], t1.split('.')[1]]
resp = rest_api.send("POST", f"storage_service/keyspace_flush/{keyspace}")
resp.raise_for_status()
resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}")
resp.raise_for_status()
resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "exclude_current_version": "true" })
resp.raise_for_status()
resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[1]}" })
resp.raise_for_status()
resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" })
resp.raise_for_status()
# non-existing table
resp = rest_api.send("GET", f"storage_service/keyspace_upgrade_sstables/{keyspace}", { "cf": f"{test_tables[0]},XXX" })
assert resp.status_code == requests.codes.bad_request