Merge 'Add keyspace_offstrategy_compaction api' from Benny Halevy

This series adds methods to perform offstrategy compaction, if needed, returning a future<bool>
so the caller can wait on it until compaction completes.
The returned value is true iff offstrategy compaction was needed.

The added keyspace_offstrategy_compaction calls perform_offstrategy_compaction on the specified keyspace and tables, return the number of tables that required offstrategy compaction.

A respective unit test was added to the rest_api pytest.

This PR replaces https://github.com/scylladb/scylla/pull/9095 that suggested adding an option to `keyspace_compaction`
since offstrategy compaction triggering logic is different enough from major compaction meriting a new api.

Test: unit (dev)

Closes #9980

* github.com:scylladb/scylla:
  test: rest_api: add unit tests for keyspace_offstrategy_compaction api
  api: add keyspace_offstrategy_compaction
  compaction_manager: get rid of submit_offstrategy
  table: add perform_offstrategy_compaction
  compaction_manager: perform_offstrategy: print ks.cf in log messages
  compaction_manager: allow waiting on offstrategy compaction
This commit is contained in:
Botond Dénes
2022-02-02 13:15:31 +02:00
7 changed files with 94 additions and 8 deletions

View File

@@ -765,6 +765,38 @@
}
]
},
{
"path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}",
"operations":[
{
"method":"POST",
"summary":"Perform offstrategy compaction, if needed, in a single keyspace",
"type":"boolean",
"nickname":"perform_keyspace_offstrategy_compaction",
"produces":[
"application/json"
],
"parameters":[
{
"name":"keyspace",
"description":"The keyspace to operate on",
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"path"
},
{
"name":"cf",
"description":"Comma-seperated table names",
"required":false,
"allowMultiple":false,
"type":"string",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/keyspace_scrub/{keyspace}",
"operations":[

View File

@@ -644,6 +644,17 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> tables) -> future<json::json_return_type> {
co_return co_await ctx.db.map_reduce0([&keyspace, &tables] (replica::database& db) -> future<bool> {
bool needed = false;
for (const auto& table : tables) {
auto& t = db.find_column_family(keyspace, table);
needed |= co_await t.perform_offstrategy_compaction();
}
co_return needed;
}, false, std::plus<bool>());
}));
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);

View File

@@ -722,7 +722,7 @@ void compaction_manager::submit(replica::table* t) {
});
}
void compaction_manager::submit_offstrategy(replica::table* t) {
future<> compaction_manager::perform_offstrategy(replica::table* t) {
auto task = make_lw_shared<compaction_manager::task>(t, sstables::compaction_type::Reshape, get_compaction_state(t));
_tasks.push_back(task);
_stats.pending_tasks++;
@@ -742,21 +742,21 @@ void compaction_manager::submit_offstrategy(replica::table* t) {
_stats.active_tasks++;
task->setup_new_compaction();
return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task] (future<> f) mutable {
return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable {
_stats.active_tasks--;
task->finish_compaction();
try {
f.get();
_stats.completed_tasks++;
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("off-strategy compaction: {}", e.what());
cmlog.info("off-strategy compaction of {}.{} was stopped: {}", schema->ks_name(), schema->cf_name(), e.what());
} catch (sstables::compaction_aborted_exception& e) {
_stats.errors++;
cmlog.error("off-strategy compaction: {}", e.what());
cmlog.error("off-strategy compaction of {}.{} was aborted: {}", schema->ks_name(), schema->cf_name(), e.what());
} catch (...) {
_stats.errors++;
_stats.pending_tasks++;
cmlog.error("off-strategy compaction failed due to {}, retrying...", std::current_exception());
cmlog.error("off-strategy compaction of {}.{} failed due to {}, retrying...", schema->ks_name(), schema->cf_name(), std::current_exception());
return put_task_to_sleep(task).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
@@ -769,6 +769,7 @@ void compaction_manager::submit_offstrategy(replica::table* t) {
_tasks.remove(task);
cmlog.debug("Offstrategy compaction task {} table={}: done", fmt::ptr(task.get()), fmt::ptr(task->compacting_table));
});
return task->compaction_done.get_future().finally([task] {});
}
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {

View File

@@ -239,7 +239,7 @@ public:
void submit(replica::table* t);
// Submit a table to be off-strategy compacted.
void submit_offstrategy(replica::table* t);
future<> perform_offstrategy(replica::table* t);
// Submit a table to be cleaned up and wait for its termination.
//

View File

@@ -889,7 +889,12 @@ public:
void start_compaction();
void trigger_compaction();
void try_trigger_compaction() noexcept;
// Triggers offstrategy compaction, if needed, in the background.
void trigger_offstrategy_compaction();
// Performs offstrategy compaction, if needed, returning
// a future<bool> that is resolved when offstrategy_compaction completes.
// The future value is true iff offstrategy compaction was required.
future<bool> perform_offstrategy_compaction();
future<> run_offstrategy_compaction(sstables::compaction_data& info);
void set_compaction_strategy(sstables::compaction_strategy_type strategy);
const sstables::compaction_strategy& get_compaction_strategy() const {

View File

@@ -1027,12 +1027,26 @@ void table::do_trigger_compaction() {
}
void table::trigger_offstrategy_compaction() {
// Run in background.
// This is safe since the the compaction task is tracked
// by the compaction_manager until stop()
(void)perform_offstrategy_compaction().then_wrapped([this] (future<bool> f) {
if (f.failed()) {
auto ex = f.get_exception();
tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex);
}
});
}
future<bool> table::perform_offstrategy_compaction() {
// If the user calls trigger_offstrategy_compaction() to trigger
// off-strategy explicitly, cancel the timeout based automatic trigger.
_off_strategy_trigger.cancel();
if (!_maintenance_sstables->all()->empty()) {
_compaction_manager.submit_offstrategy(this);
if (_maintenance_sstables->all()->empty()) {
co_return false;
}
co_await _compaction_manager.perform_offstrategy(this);
co_return true;
}
future<> table::run_offstrategy_compaction(sstables::compaction_data& info) {

View File

@@ -72,3 +72,26 @@ def test_storage_service_auto_compaction_tables(cql, this_dc, rest_api):
assert resp.status_code == requests.codes.bad_request
cql.execute(f"DROP KEYSPACE {keyspace}")
def test_storage_service_keyspace_offstrategy_compaction(cql, this_dc, rest_api):
keyspace = new_keyspace(cql, this_dc)
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0:
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}")
resp.raise_for_status()
cql.execute(f"DROP KEYSPACE {keyspace}")
def test_storage_service_keyspace_offstrategy_compaction_tables(cql, this_dc, rest_api):
keyspace = new_keyspace(cql, this_dc)
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0:
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t1:
test_tables = [t0.split('.')[1], t1.split('.')[1]]
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" })
resp.raise_for_status()
# non-existing table
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},XXX" })
assert resp.status_code == requests.codes.bad_request
cql.execute(f"DROP KEYSPACE {keyspace}")