Merge 'tasks: compaction: drop regular compaction tasks after they are finished' from Aleksandra Martyniuk

Make compaction tasks internal. Drop all internal tasks without parents
immediately after they are done.

Fixes: #16735
Refs: #16694.

Closes scylladb/scylladb#16698

* github.com:scylladb/scylladb:
  compaction: make regular compaction tasks internal
  tasks: don't keep internal root tasks after they complete
This commit is contained in:
Botond Dénes
2024-01-11 12:10:44 +02:00
3 changed files with 14 additions and 34 deletions

View File

@@ -707,6 +707,10 @@ public:
virtual std::string type() const override {
return "regular compaction";
}
virtual tasks::is_internal is_internal() const noexcept override {
return tasks::is_internal::yes;
}
protected:
virtual future<> run() override = 0;
};

View File

@@ -197,7 +197,11 @@ void task_manager::task::start() {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
// After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way.
auto module = _impl->_module;
(void)done().finally([module] {
bool drop_after_complete = is_internal() && !get_parent_id();
(void)done().finally([module, drop_after_complete] {
if (drop_after_complete) {
return make_ready_future<>();
}
return sleep_abortable(module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id = id()] (auto f) {
f.ignore_ready_future();

View File

@@ -5,7 +5,7 @@ import threading
sys.path.insert(1, sys.path[0] + '/../cql-pytest')
from util import new_test_table, new_test_keyspace
from rest_util import set_tmp_task_ttl, scylla_inject_error
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks, abort_task
from task_manager_utils import wait_for_task, list_tasks, check_child_parent_relationship, drain_module_tasks, abort_task, get_task_status
module_name = "compaction"
long_time = 1000000000
@@ -106,27 +106,6 @@ def test_resharding_compaction_task(cql, this_dc, rest_api):
check_compaction_task(cql, this_dc, rest_api, lambda keyspace, table: rest_api.send("POST", f"storage_service/sstables/{keyspace}", {'cf': table, 'load_and_stream': False}), "resharding compaction", task_tree_depth, True)
def test_regular_compaction_task(cql, this_dc, rest_api):
drain_module_tasks(rest_api, module_name)
with set_tmp_task_ttl(rest_api, long_time):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
schema = 'p int, v text, primary key (p)'
with new_test_table(cql, keyspace, schema) as t0:
stmt = cql.prepare(f"INSERT INTO {t0} (p, v) VALUES (?, ?)")
cql.execute(stmt, [0, 'hello'])
cql.execute(stmt, [1, 'world'])
[_, table] = t0.split(".")
resp = rest_api.send("POST", f"column_family/autocompaction/{keyspace}:{table}")
resp.raise_for_status()
statuses = [wait_for_task(rest_api, task["task_id"]) for task in list_tasks(rest_api, "compaction") if task["type"] == "regular compaction" and task["keyspace"] == keyspace and task["table"] == table]
assert statuses, f"regular compaction task for {t0} was not created"
failed = [status["task_id"] for status in statuses if status["state"] != "done"]
assert not failed, f"Regular compaction tasks with ids = {failed} failed"
drain_module_tasks(rest_api, module_name)
def test_running_compaction_task_abort(cql, this_dc, rest_api):
drain_module_tasks(rest_api, module_name)
with set_tmp_task_ttl(rest_api, long_time):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace:
@@ -142,22 +121,15 @@ def test_running_compaction_task_abort(cql, this_dc, rest_api):
resp = rest_api.send("POST", f"column_family/autocompaction/{keyspace}:{table}")
resp.raise_for_status()
tasks = [task for task in list_tasks(rest_api, module_name, False, keyspace, table) if task["type"] == "regular compaction"]
assert tasks, "Compaction task was not created"
for task in tasks:
abort_task(rest_api, task["task_id"])
statuses = [get_task_status(rest_api, task["task_id"]) for task in list_tasks(rest_api, "compaction", internal=True) if task["type"] == "regular compaction" and task["keyspace"] == keyspace and task["table"] == table]
assert statuses, f"regular compaction task for {t0} was not created"
assert all([s["state"] != "done" and s["state"] != "failed" for s in statuses]), "Regular compaction task isn't unregiatered after it completes"
resp = rest_api.send("POST", f"v2/error_injection/injection/{injection}/message")
resp.raise_for_status()
statuses = [wait_for_task(rest_api, task["task_id"]) for task in tasks]
aborted = [status for status in statuses if "abort requested" in status["error"]]
assert aborted, "Task wasn't aborted by user"
assert all([status["state"] == "failed" for status in aborted]), "Task finished successfully"
drain_module_tasks(rest_api, module_name)
def test_not_created_compaction_task_abort(cql, this_dc, rest_api):
def test_compaction_task_abort(cql, this_dc, rest_api):
drain_module_tasks(rest_api, module_name)
with set_tmp_task_ttl(rest_api, long_time):
with new_test_keyspace(cql, f"WITH REPLICATION = {{ 'class' : 'NetworkTopologyStrategy', '{this_dc}' : 1 }}") as keyspace: