diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 8dc9a19c05..d218b8fa85 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -765,6 +765,38 @@ } ] }, + { + "path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}", + "operations":[ + { + "method":"POST", + "summary":"Perform offstrategy compaction, if needed, in a single keyspace", + "type":"boolean", + "nickname":"perform_keyspace_offstrategy_compaction", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"keyspace", + "description":"The keyspace to operate on", + "required":true, + "allowMultiple":false, + "type":"string", + "paramType":"path" + }, + { + "name":"cf", + "description":"Comma-seperated table names", + "required":false, + "allowMultiple":false, + "type":"string", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/keyspace_scrub/{keyspace}", "operations":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index 1c17b57107..6e41652046 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -644,6 +644,17 @@ void set_storage_service(http_context& ctx, routes& r, sharded req, sstring keyspace, std::vector tables) -> future { + co_return co_await ctx.db.map_reduce0([&keyspace, &tables] (replica::database& db) -> future { + bool needed = false; + for (const auto& table : tables) { + auto& t = db.find_column_family(keyspace, table); + needed |= co_await t.perform_offstrategy_compaction(); + } + co_return needed; + }, false, std::plus()); + })); + ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { bool exclude_current_version = req_param(*req, "exclude_current_version", false); diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 28ba3744a0..a7477421e6 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -722,7 +722,7 @@ void compaction_manager::submit(replica::table* t) { }); } -void compaction_manager::submit_offstrategy(replica::table* t) { +future<> compaction_manager::perform_offstrategy(replica::table* t) { auto task = make_lw_shared(t, sstables::compaction_type::Reshape, get_compaction_state(t)); _tasks.push_back(task); _stats.pending_tasks++; @@ -742,21 +742,21 @@ void compaction_manager::submit_offstrategy(replica::table* t) { _stats.active_tasks++; task->setup_new_compaction(); - return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task] (future<> f) mutable { + return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable { _stats.active_tasks--; task->finish_compaction(); try { f.get(); _stats.completed_tasks++; } catch (sstables::compaction_stopped_exception& e) { - cmlog.info("off-strategy compaction: {}", e.what()); + cmlog.info("off-strategy compaction of {}.{} was stopped: {}", schema->ks_name(), schema->cf_name(), e.what()); } catch (sstables::compaction_aborted_exception& e) { _stats.errors++; - cmlog.error("off-strategy compaction: {}", e.what()); + cmlog.error("off-strategy compaction of {}.{} was aborted: {}", schema->ks_name(), schema->cf_name(), e.what()); } catch (...) { _stats.errors++; _stats.pending_tasks++; - cmlog.error("off-strategy compaction failed due to {}, retrying...", std::current_exception()); + cmlog.error("off-strategy compaction of {}.{} failed due to {}, retrying...", schema->ks_name(), schema->cf_name(), std::current_exception()); return put_task_to_sleep(task).then([] { return make_ready_future(stop_iteration::no); }); @@ -769,6 +769,7 @@ void compaction_manager::submit_offstrategy(replica::table* t) { _tasks.remove(task); cmlog.debug("Offstrategy compaction task {} table={}: done", fmt::ptr(task.get()), fmt::ptr(task->compacting_table)); }); + return task->compaction_done.get_future().finally([task] {}); } future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index e9c19b70a0..fea2c78f8f 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -239,7 +239,7 @@ public: void submit(replica::table* t); // Submit a table to be off-strategy compacted. - void submit_offstrategy(replica::table* t); + future<> perform_offstrategy(replica::table* t); // Submit a table to be cleaned up and wait for its termination. // diff --git a/replica/database.hh b/replica/database.hh index a802945ba4..cb8f6aac4e 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -889,7 +889,12 @@ public: void start_compaction(); void trigger_compaction(); void try_trigger_compaction() noexcept; + // Triggers offstrategy compaction, if needed, in the background. void trigger_offstrategy_compaction(); + // Performs offstrategy compaction, if needed, returning + // a future that is resolved when offstrategy_compaction completes. + // The future value is true iff offstrategy compaction was required. + future perform_offstrategy_compaction(); future<> run_offstrategy_compaction(sstables::compaction_data& info); void set_compaction_strategy(sstables::compaction_strategy_type strategy); const sstables::compaction_strategy& get_compaction_strategy() const { diff --git a/replica/table.cc b/replica/table.cc index 37555c20aa..df7e2ba60b 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1027,12 +1027,26 @@ void table::do_trigger_compaction() { } void table::trigger_offstrategy_compaction() { + // Run in background. + // This is safe since the the compaction task is tracked + // by the compaction_manager until stop() + (void)perform_offstrategy_compaction().then_wrapped([this] (future f) { + if (f.failed()) { + auto ex = f.get_exception(); + tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex); + } + }); +} + +future table::perform_offstrategy_compaction() { // If the user calls trigger_offstrategy_compaction() to trigger // off-strategy explicitly, cancel the timeout based automatic trigger. _off_strategy_trigger.cancel(); - if (!_maintenance_sstables->all()->empty()) { - _compaction_manager.submit_offstrategy(this); + if (_maintenance_sstables->all()->empty()) { + co_return false; } + co_await _compaction_manager.perform_offstrategy(this); + co_return true; } future<> table::run_offstrategy_compaction(sstables::compaction_data& info) { diff --git a/test/rest_api/test_storage_service.py b/test/rest_api/test_storage_service.py index 05cedb40b2..164e0c8bc6 100644 --- a/test/rest_api/test_storage_service.py +++ b/test/rest_api/test_storage_service.py @@ -72,3 +72,26 @@ def test_storage_service_auto_compaction_tables(cql, this_dc, rest_api): assert resp.status_code == requests.codes.bad_request cql.execute(f"DROP KEYSPACE {keyspace}") + +def test_storage_service_keyspace_offstrategy_compaction(cql, this_dc, rest_api): + keyspace = new_keyspace(cql, this_dc) + with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0: + resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}") + resp.raise_for_status() + + cql.execute(f"DROP KEYSPACE {keyspace}") + +def test_storage_service_keyspace_offstrategy_compaction_tables(cql, this_dc, rest_api): + keyspace = new_keyspace(cql, this_dc) + with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0: + with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t1: + test_tables = [t0.split('.')[1], t1.split('.')[1]] + + resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" }) + resp.raise_for_status() + + # non-existing table + resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},XXX" }) + assert resp.status_code == requests.codes.bad_request + + cql.execute(f"DROP KEYSPACE {keyspace}")