Merge 'Add keyspace_offstrategy_compaction api' from Benny Halevy
This series adds methods to perform offstrategy compaction, if needed, returning a future<bool> so the caller can wait on it until compaction completes. The returned value is true iff offstrategy compaction was needed. The added keyspace_offstrategy_compaction calls perform_offstrategy_compaction on the specified keyspace and tables, return the number of tables that required offstrategy compaction. A respective unit test was added to the rest_api pytest. This PR replaces https://github.com/scylladb/scylla/pull/9095 that suggested adding an option to `keyspace_compaction` since offstrategy compaction triggering logic is different enough from major compaction meriting a new api. Test: unit (dev) Closes #9980 * github.com:scylladb/scylla: test: rest_api: add unit tests for keyspace_offstrategy_compaction api api: add keyspace_offstrategy_compaction compaction_manager: get rid of submit_offstrategy table: add perform_offstrategy_compaction compaction_manager: perform_offstrategy: print ks.cf in log messages compaction_manager: allow waiting on offstrategy compaction
This commit is contained in:
@@ -765,6 +765,38 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/keyspace_offstrategy_compaction/{keyspace}",
|
||||
"operations":[
|
||||
{
|
||||
"method":"POST",
|
||||
"summary":"Perform offstrategy compaction, if needed, in a single keyspace",
|
||||
"type":"boolean",
|
||||
"nickname":"perform_keyspace_offstrategy_compaction",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"keyspace",
|
||||
"description":"The keyspace to operate on",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"path"
|
||||
},
|
||||
{
|
||||
"name":"cf",
|
||||
"description":"Comma-seperated table names",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/keyspace_scrub/{keyspace}",
|
||||
"operations":[
|
||||
|
||||
@@ -644,6 +644,17 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
|
||||
});
|
||||
});
|
||||
|
||||
ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> tables) -> future<json::json_return_type> {
|
||||
co_return co_await ctx.db.map_reduce0([&keyspace, &tables] (replica::database& db) -> future<bool> {
|
||||
bool needed = false;
|
||||
for (const auto& table : tables) {
|
||||
auto& t = db.find_column_family(keyspace, table);
|
||||
needed |= co_await t.perform_offstrategy_compaction();
|
||||
}
|
||||
co_return needed;
|
||||
}, false, std::plus<bool>());
|
||||
}));
|
||||
|
||||
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
|
||||
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
|
||||
|
||||
|
||||
@@ -722,7 +722,7 @@ void compaction_manager::submit(replica::table* t) {
|
||||
});
|
||||
}
|
||||
|
||||
void compaction_manager::submit_offstrategy(replica::table* t) {
|
||||
future<> compaction_manager::perform_offstrategy(replica::table* t) {
|
||||
auto task = make_lw_shared<compaction_manager::task>(t, sstables::compaction_type::Reshape, get_compaction_state(t));
|
||||
_tasks.push_back(task);
|
||||
_stats.pending_tasks++;
|
||||
@@ -742,21 +742,21 @@ void compaction_manager::submit_offstrategy(replica::table* t) {
|
||||
_stats.active_tasks++;
|
||||
task->setup_new_compaction();
|
||||
|
||||
return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task] (future<> f) mutable {
|
||||
return t->run_offstrategy_compaction(task->compaction_data).then_wrapped([this, task, schema = t->schema()] (future<> f) mutable {
|
||||
_stats.active_tasks--;
|
||||
task->finish_compaction();
|
||||
try {
|
||||
f.get();
|
||||
_stats.completed_tasks++;
|
||||
} catch (sstables::compaction_stopped_exception& e) {
|
||||
cmlog.info("off-strategy compaction: {}", e.what());
|
||||
cmlog.info("off-strategy compaction of {}.{} was stopped: {}", schema->ks_name(), schema->cf_name(), e.what());
|
||||
} catch (sstables::compaction_aborted_exception& e) {
|
||||
_stats.errors++;
|
||||
cmlog.error("off-strategy compaction: {}", e.what());
|
||||
cmlog.error("off-strategy compaction of {}.{} was aborted: {}", schema->ks_name(), schema->cf_name(), e.what());
|
||||
} catch (...) {
|
||||
_stats.errors++;
|
||||
_stats.pending_tasks++;
|
||||
cmlog.error("off-strategy compaction failed due to {}, retrying...", std::current_exception());
|
||||
cmlog.error("off-strategy compaction of {}.{} failed due to {}, retrying...", schema->ks_name(), schema->cf_name(), std::current_exception());
|
||||
return put_task_to_sleep(task).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
@@ -769,6 +769,7 @@ void compaction_manager::submit_offstrategy(replica::table* t) {
|
||||
_tasks.remove(task);
|
||||
cmlog.debug("Offstrategy compaction task {} table={}: done", fmt::ptr(task.get()), fmt::ptr(task->compacting_table));
|
||||
});
|
||||
return task->compaction_done.get_future().finally([task] {});
|
||||
}
|
||||
|
||||
future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
|
||||
|
||||
@@ -239,7 +239,7 @@ public:
|
||||
void submit(replica::table* t);
|
||||
|
||||
// Submit a table to be off-strategy compacted.
|
||||
void submit_offstrategy(replica::table* t);
|
||||
future<> perform_offstrategy(replica::table* t);
|
||||
|
||||
// Submit a table to be cleaned up and wait for its termination.
|
||||
//
|
||||
|
||||
@@ -889,7 +889,12 @@ public:
|
||||
void start_compaction();
|
||||
void trigger_compaction();
|
||||
void try_trigger_compaction() noexcept;
|
||||
// Triggers offstrategy compaction, if needed, in the background.
|
||||
void trigger_offstrategy_compaction();
|
||||
// Performs offstrategy compaction, if needed, returning
|
||||
// a future<bool> that is resolved when offstrategy_compaction completes.
|
||||
// The future value is true iff offstrategy compaction was required.
|
||||
future<bool> perform_offstrategy_compaction();
|
||||
future<> run_offstrategy_compaction(sstables::compaction_data& info);
|
||||
void set_compaction_strategy(sstables::compaction_strategy_type strategy);
|
||||
const sstables::compaction_strategy& get_compaction_strategy() const {
|
||||
|
||||
@@ -1027,12 +1027,26 @@ void table::do_trigger_compaction() {
|
||||
}
|
||||
|
||||
void table::trigger_offstrategy_compaction() {
|
||||
// Run in background.
|
||||
// This is safe since the the compaction task is tracked
|
||||
// by the compaction_manager until stop()
|
||||
(void)perform_offstrategy_compaction().then_wrapped([this] (future<bool> f) {
|
||||
if (f.failed()) {
|
||||
auto ex = f.get_exception();
|
||||
tlogger.warn("Offstrategy compaction of {}.{} failed: {}, ignoring", schema()->ks_name(), schema()->cf_name(), ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
future<bool> table::perform_offstrategy_compaction() {
|
||||
// If the user calls trigger_offstrategy_compaction() to trigger
|
||||
// off-strategy explicitly, cancel the timeout based automatic trigger.
|
||||
_off_strategy_trigger.cancel();
|
||||
if (!_maintenance_sstables->all()->empty()) {
|
||||
_compaction_manager.submit_offstrategy(this);
|
||||
if (_maintenance_sstables->all()->empty()) {
|
||||
co_return false;
|
||||
}
|
||||
co_await _compaction_manager.perform_offstrategy(this);
|
||||
co_return true;
|
||||
}
|
||||
|
||||
future<> table::run_offstrategy_compaction(sstables::compaction_data& info) {
|
||||
|
||||
@@ -72,3 +72,26 @@ def test_storage_service_auto_compaction_tables(cql, this_dc, rest_api):
|
||||
assert resp.status_code == requests.codes.bad_request
|
||||
|
||||
cql.execute(f"DROP KEYSPACE {keyspace}")
|
||||
|
||||
def test_storage_service_keyspace_offstrategy_compaction(cql, this_dc, rest_api):
|
||||
keyspace = new_keyspace(cql, this_dc)
|
||||
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0:
|
||||
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}")
|
||||
resp.raise_for_status()
|
||||
|
||||
cql.execute(f"DROP KEYSPACE {keyspace}")
|
||||
|
||||
def test_storage_service_keyspace_offstrategy_compaction_tables(cql, this_dc, rest_api):
|
||||
keyspace = new_keyspace(cql, this_dc)
|
||||
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t0:
|
||||
with new_test_table(cql, keyspace, "a int, PRIMARY KEY (a)") as t1:
|
||||
test_tables = [t0.split('.')[1], t1.split('.')[1]]
|
||||
|
||||
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},{test_tables[1]}" })
|
||||
resp.raise_for_status()
|
||||
|
||||
# non-existing table
|
||||
resp = rest_api.send("POST", f"storage_service/keyspace_offstrategy_compaction/{keyspace}", { "cf": f"{test_tables[0]},XXX" })
|
||||
assert resp.status_code == requests.codes.bad_request
|
||||
|
||||
cql.execute(f"DROP KEYSPACE {keyspace}")
|
||||
|
||||
Reference in New Issue
Block a user