scrub compaction: count validation errors and return status over the rest api

Performing compaction scrub user did not know whether any validation
errors were encountered.

The number of validation errors per given compaction scrub is gathered
and summed from each shard. Basing on that value return status over
the rest api is set to 3 if any validation errors were encountered.
This commit is contained in:
Aleksandra Martyniuk
2022-07-27 14:00:52 +02:00
parent 7d457cffb8
commit f1980f8dc6
3 changed files with 46 additions and 23 deletions

View File

@@ -11,6 +11,7 @@
#include "db/config.hh"
#include "db/schema_tables.hh"
#include "utils/hash.hh"
#include <optional>
#include <sstream>
#include <time.h>
#include <algorithm>
@@ -1266,6 +1267,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
}
enum class scrub_status {
successful = 0,
aborted,
unable_to_cancel, // Not used in Scylla, included to ensure compability with nodetool api.
validation_errors,
};
void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl) {
ss::get_snapshot_details.set(r, [&snap_ctl](std::unique_ptr<request> req) {
return snap_ctl.local().get_snapshot_details().then([] (std::unordered_map<sstring, std::vector<db::snapshot_ctl::snapshot_details>>&& result) {
@@ -1409,16 +1417,28 @@ void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_
} else {
throw httpd::bad_param_exception(fmt::format("Unknown argument for 'quarantine_mode' parameter: {}", quarantine_mode_str));
}
return f.then([&ctx, keyspace, column_families, opts] {
return ctx.db.invoke_on_all([=] (replica::database& db) {
return do_for_each(column_families, [=, &db](sstring cfname) {
const auto& reduce_compaction_stats = [] (const compaction_manager::compaction_stats_opt& lhs, const compaction_manager::compaction_stats_opt& rhs) {
sstables::compaction_stats stats{};
stats += lhs.value();
stats += rhs.value();
return stats;
};
return f.then([&ctx, keyspace, column_families, opts, &reduce_compaction_stats] {
return ctx.db.map_reduce0([=] (replica::database& db) {
return map_reduce(column_families, [=, &db] (sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_scrub(cf.as_table_state(), opts);
});
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
}, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats);
}, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats);
}).then([] (auto f) {
if (f->validation_errors) {
return make_ready_future<json::json_return_type>(static_cast<int>(scrub_status::validation_errors));
} else {
return make_ready_future<json::json_return_type>(static_cast<int>(scrub_status::successful));
}
});
});
}

View File

@@ -277,7 +277,7 @@ compaction_manager::task::task(compaction_manager& mgr, compaction::table_state*
, _description(std::move(desc))
{}
future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
_tasks.push_back(task);
auto unregister_task = defer([this, task] {
_tasks.remove(task);
@@ -285,8 +285,9 @@ future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> t
cmlog.debug("{}: started", *task);
try {
co_await task->run();
auto&& res = co_await task->run();
cmlog.debug("{}: done", *task);
co_return res;
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{}: stopped, reason: {}", *task, e.what());
} catch (sstables::compaction_aborted_exception& e) {
@@ -303,6 +304,8 @@ future<> compaction_manager::perform_task(shared_ptr<compaction_manager::task> t
_stats.errors++;
throw;
}
co_return std::nullopt;
}
future<sstables::compaction_result> compaction_manager::task::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) {
@@ -401,7 +404,7 @@ future<> compaction_manager::perform_major_compaction(compaction::table_state& t
if (_state != state::enabled) {
return make_ready_future<>();
}
return perform_task(make_shared<major_compaction_task>(*this, &t));
return perform_task(make_shared<major_compaction_task>(*this, &t)).discard_result();;
}
class compaction_manager::custom_compaction_task : public compaction_manager::task {
@@ -441,7 +444,7 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables
return make_ready_future<>();
}
return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job)));
return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job))).discard_result();
}
future<> compaction_manager::update_static_shares(float static_shares) {
@@ -1252,9 +1255,9 @@ private:
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction_manager::task>
future<> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
co_return;
co_return std::nullopt;
}
// since we might potentially have ongoing compactions, and we
@@ -1276,10 +1279,10 @@ future<> compaction_manager::perform_task_on_all_files(compaction::table_state&
return a->data_size() > b->data_size();
});
});
co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
co_return co_await perform_task(seastar::make_shared<TaskType>(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward<Args>(args)...));
}
future<> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
future<compaction_manager::compaction_stats_opt> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) {
return perform_task_on_all_files<rewrite_sstables_compaction_task>(t, std::move(options), std::move(get_func), can_purge);
}
@@ -1344,9 +1347,9 @@ static std::vector<sstables::shared_sstable> get_all_sstables(compaction::table_
return s;
}
future<> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) {
if (_state != state::enabled) {
return make_ready_future<>();
return make_ready_future<compaction_manager::compaction_stats_opt>();
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
@@ -1505,7 +1508,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp
}
// Submit a table to be scrubbed and wait for its termination.
future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) {
auto scrub_mode = opts.operation_mode;
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(t);

View File

@@ -299,7 +299,7 @@ private:
class strategy_control;
std::unique_ptr<strategy_control> _strategy_control;
private:
future<> perform_task(shared_ptr<task>);
future<compaction_stats_opt> perform_task(shared_ptr<task>);
future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
future<> update_throughput(uint32_t value_mbs);
@@ -340,7 +340,7 @@ private:
// similar-sized compaction.
void postpone_compaction_for_table(compaction::table_state* t);
future<> perform_sstable_scrub_validate_mode(compaction::table_state& t);
future<compaction_stats_opt> perform_sstable_scrub_validate_mode(compaction::table_state& t);
future<> update_static_shares(float shares);
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;
@@ -349,9 +349,9 @@ private:
// by retrieving set of candidates only after all compactions for table T were stopped, if any.
template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, task>
future<> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<compaction_stats_opt> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args);
future<> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
future<compaction_stats_opt> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes);
// Stop all fibers, without waiting. Safe to be called multiple times.
void do_stop() noexcept;
@@ -411,7 +411,7 @@ public:
future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version);
// Submit a table to be scrubbed and wait for its termination.
future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts);
future<compaction_stats_opt> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts);
// Submit a table for major compaction.
future<> perform_major_compaction(compaction::table_state& t);