Merge "Add support to stop ongoing compaction" from Raphael
"stop compaction is about temporarily interrupting all ongoing compaction of a given type. That will also be needed for 'nodetool stop <compaction_type>'. The test was about starting scylla, stressing it, stopping compaction using the API and checking that scylla was able to recover. Scylla will print a message as follow for each compaction that was stopped: ERROR [shard 0] compaction_manager - compaction failed: read exception: std::runtime_error (Compaction for keyspace1/standard1 was deliberately stopped.) INFO [shard 0] compaction_manager - compaction task handler sleeping for 20 seconds"
This commit is contained in:
@@ -106,7 +106,7 @@
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type":"string",
|
||||
"paramType":"string"
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -67,11 +67,14 @@ void set_compaction_manager(http_context& ctx, routes& r) {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
|
||||
cm::stop_compaction.set(r, [] (std::unique_ptr<request> req) {
|
||||
//TBD
|
||||
// FIXME
|
||||
warn(unimplemented::cause::API);
|
||||
return make_ready_future<json::json_return_type>("");
|
||||
cm::stop_compaction.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
auto type = req->get_query_param("type");
|
||||
return ctx.db.invoke_on_all([type] (database& db) {
|
||||
auto& cm = db.get_compaction_manager();
|
||||
cm.stop_compaction(type);
|
||||
}).then([] {
|
||||
return make_ready_future<json::json_return_type>(json_void());
|
||||
});
|
||||
});
|
||||
|
||||
cm::get_pending_tasks.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
|
||||
@@ -112,12 +112,13 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
std::vector<::mutation_reader> readers;
|
||||
uint64_t estimated_partitions = 0;
|
||||
auto ancestors = make_lw_shared<std::vector<unsigned long>>();
|
||||
auto stats = make_lw_shared<compaction_stats>();
|
||||
auto info = make_lw_shared<compaction_info>();
|
||||
auto& cm = cf.get_compaction_manager();
|
||||
sstring sstable_logger_msg = "[";
|
||||
|
||||
info->type = (cleanup) ? compaction_type::Cleanup : compaction_type::Compaction;
|
||||
// register compaction_stats of starting compaction into compaction manager
|
||||
cm.register_compaction(stats);
|
||||
cm.register_compaction(info);
|
||||
|
||||
assert(sstables.size() > 0);
|
||||
|
||||
@@ -141,11 +142,11 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
// for a better estimate for the number of partitions in the merged
|
||||
// sstable than just adding up the lengths of individual sstables.
|
||||
estimated_partitions += sst->get_estimated_key_count();
|
||||
stats->total_partitions += sst->get_estimated_key_count();
|
||||
info->total_partitions += sst->get_estimated_key_count();
|
||||
// Compacted sstable keeps track of its ancestors.
|
||||
ancestors->push_back(sst->generation());
|
||||
sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), sst->get_sstable_level());
|
||||
stats->start_size += sst->data_size();
|
||||
info->start_size += sst->data_size();
|
||||
// TODO:
|
||||
// Note that this is not fully correct. Since we might be merging sstables that originated on
|
||||
// another shard (#cpu changed), we might be comparing RP:s with differing shard ids,
|
||||
@@ -156,13 +157,13 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
rp = std::max(rp, sst->get_stats_metadata().position);
|
||||
}
|
||||
|
||||
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(stats->start_size) / max_sstable_size)));
|
||||
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(info->start_size) / max_sstable_size)));
|
||||
uint64_t partitions_per_sstable = ceil(double(estimated_partitions) / estimated_sstables);
|
||||
|
||||
sstable_logger_msg += "]";
|
||||
stats->sstables = sstables.size();
|
||||
stats->ks = schema->ks_name();
|
||||
stats->cf = schema->cf_name();
|
||||
info->sstables = sstables.size();
|
||||
info->ks = schema->ks_name();
|
||||
info->cf = schema->cf_name();
|
||||
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
|
||||
|
||||
class compacting_reader final : public ::mutation_reader::impl {
|
||||
@@ -239,10 +240,14 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
auto output_reader = make_lw_shared<seastar::pipe_reader<mutation>>(std::move(output.reader));
|
||||
auto output_writer = make_lw_shared<seastar::pipe_writer<mutation>>(std::move(output.writer));
|
||||
|
||||
future<> read_done = repeat([output_writer, reader = std::move(reader), stats] () mutable {
|
||||
return reader().then([output_writer, stats] (auto mopt) {
|
||||
future<> read_done = repeat([output_writer, reader = std::move(reader), info] () mutable {
|
||||
if (info->is_stop_requested()) {
|
||||
// Compaction manager will catch this exception and re-schedule the compaction.
|
||||
throw std::runtime_error(sprint("Compaction for %s/%s was deliberately stopped.", info->ks, info->cf));
|
||||
}
|
||||
return reader().then([output_writer, info] (auto mopt) {
|
||||
if (mopt) {
|
||||
stats->total_keys_written++;
|
||||
info->total_keys_written++;
|
||||
return output_writer->write(std::move(*mopt)).then([] {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
@@ -264,9 +269,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
|
||||
// If there is a maximum size for a sstable, it's possible that more than
|
||||
// one sstable will be generated for all partitions to be written.
|
||||
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] {
|
||||
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] {
|
||||
return output_reader->read().then(
|
||||
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] (auto mut) {
|
||||
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] (auto mut) {
|
||||
// Check if mutation is available from the pipe for a new sstable to be written. If not, just stop writing.
|
||||
if (!mut) {
|
||||
return make_ready_future<stop_iteration>(stop_iteration::yes);
|
||||
@@ -283,10 +288,10 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
|
||||
::mutation_reader mutation_queue_reader = make_mutation_reader<queue_reader>(output_reader);
|
||||
|
||||
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, stats] {
|
||||
return newtab->open_data().then([newtab, stats] {
|
||||
stats->new_sstables.push_back(newtab);
|
||||
stats->end_size += newtab->data_size();
|
||||
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, info] {
|
||||
return newtab->open_data().then([newtab, info] {
|
||||
info->new_sstables.push_back(newtab);
|
||||
info->end_size += newtab->data_size();
|
||||
return make_ready_future<stop_iteration>(stop_iteration::no);
|
||||
});
|
||||
});
|
||||
@@ -294,9 +299,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
}).then([output_reader] {});
|
||||
|
||||
// Wait for both read_done and write_done fibers to finish.
|
||||
return when_all(std::move(read_done), std::move(write_done)).then([&cm, stats] (std::tuple<future<>, future<>> t) {
|
||||
return when_all(std::move(read_done), std::move(write_done)).then([&cm, info] (std::tuple<future<>, future<>> t) {
|
||||
// deregister compaction_stats of finished compaction from compaction manager.
|
||||
cm.deregister_compaction(stats);
|
||||
cm.deregister_compaction(info);
|
||||
|
||||
sstring ex;
|
||||
try {
|
||||
@@ -314,15 +319,15 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
if (ex.size()) {
|
||||
throw std::runtime_error(ex);
|
||||
}
|
||||
}).then([start_time, stats, cleanup] {
|
||||
double ratio = double(stats->end_size) / double(stats->start_size);
|
||||
}).then([start_time, info, cleanup] {
|
||||
double ratio = double(info->end_size) / double(info->start_size);
|
||||
auto end_time = std::chrono::steady_clock::now();
|
||||
// time taken by compaction in seconds.
|
||||
auto duration = std::chrono::duration<float>(end_time - start_time);
|
||||
auto throughput = (double(stats->end_size) / (1024*1024)) / duration.count();
|
||||
auto throughput = (double(info->end_size) / (1024*1024)) / duration.count();
|
||||
sstring new_sstables_msg;
|
||||
|
||||
for (auto& newtab : stats->new_sstables) {
|
||||
for (auto& newtab : info->new_sstables) {
|
||||
new_sstables_msg += sprint("%s:level=%d, ", newtab->get_filename(), newtab->get_sstable_level());
|
||||
}
|
||||
|
||||
@@ -334,9 +339,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
logger.info("{} {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \
|
||||
"~{} total partitions merged to {}.",
|
||||
(!cleanup) ? "Compacted" : "Cleaned",
|
||||
stats->sstables, new_sstables_msg, stats->start_size, stats->end_size, (int) (ratio * 100),
|
||||
info->sstables, new_sstables_msg, info->start_size, info->end_size, (int) (ratio * 100),
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), throughput,
|
||||
stats->total_partitions, stats->total_keys_written);
|
||||
info->total_partitions, info->total_keys_written);
|
||||
|
||||
// If compaction is running for testing purposes, detect that there is
|
||||
// no query context and skip code that updates compaction history.
|
||||
@@ -355,8 +360,8 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
|
||||
// shows how many sstables each row is merged from. This information
|
||||
// cannot be accessed until we make combined_reader more generic,
|
||||
// for example, by adding a reducer method.
|
||||
return db::system_keyspace::update_compaction_history(stats->ks, stats->cf, compacted_at,
|
||||
stats->start_size, stats->end_size, std::unordered_map<int32_t, int64_t>{});
|
||||
return db::system_keyspace::update_compaction_history(info->ks, info->cf, compacted_at,
|
||||
info->start_size, info->end_size, std::unordered_map<int32_t, int64_t>{});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,16 @@ namespace sstables {
|
||||
: sstables(std::move(sstables)) {}
|
||||
};
|
||||
|
||||
struct compaction_stats {
|
||||
enum class compaction_type {
|
||||
Compaction = 0,
|
||||
Cleanup = 1,
|
||||
Validation = 2,
|
||||
Scrub = 3,
|
||||
Index_build = 4,
|
||||
};
|
||||
|
||||
struct compaction_info {
|
||||
compaction_type type;
|
||||
sstring ks;
|
||||
sstring cf;
|
||||
size_t sstables = 0;
|
||||
@@ -55,6 +64,15 @@ namespace sstables {
|
||||
uint64_t total_partitions = 0;
|
||||
uint64_t total_keys_written = 0;
|
||||
std::vector<shared_sstable> new_sstables;
|
||||
bool stop_requested = false;
|
||||
|
||||
bool is_stop_requested() const {
|
||||
return stop_requested;
|
||||
}
|
||||
|
||||
void stop() {
|
||||
stop_requested = true;
|
||||
}
|
||||
};
|
||||
|
||||
// Compact a list of N sstables into M sstables.
|
||||
|
||||
@@ -322,3 +322,21 @@ future<> compaction_manager::remove(column_family* cf) {
|
||||
});
|
||||
}).then([tasks_to_stop] {});
|
||||
}
|
||||
|
||||
void compaction_manager::stop_compaction(sstring type) {
|
||||
// TODO: this method only works for compaction of type compaction and cleanup.
|
||||
// Other types are: validation, scrub, index_build.
|
||||
sstables::compaction_type target_type;
|
||||
if (type == "COMPACTION") {
|
||||
target_type = sstables::compaction_type::Compaction;
|
||||
} else if (type == "CLEANUP") {
|
||||
target_type = sstables::compaction_type::Cleanup;
|
||||
} else {
|
||||
throw std::runtime_error(sprint("Compaction of type %s cannot be stopped by compaction manager", type.c_str()));
|
||||
}
|
||||
for (auto& info : _compactions) {
|
||||
if (target_type == info->type) {
|
||||
info->stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ private:
|
||||
stats _stats;
|
||||
std::vector<scollectd::registration> _registrations;
|
||||
|
||||
std::list<lw_shared_ptr<sstables::compaction_stats>> _compactions;
|
||||
std::list<lw_shared_ptr<sstables::compaction_info>> _compactions;
|
||||
private:
|
||||
void task_start(lw_shared_ptr<task>& task);
|
||||
future<> task_stop(lw_shared_ptr<task>& task);
|
||||
@@ -105,16 +105,19 @@ public:
|
||||
return _stats;
|
||||
}
|
||||
|
||||
void register_compaction(lw_shared_ptr<sstables::compaction_stats> c) {
|
||||
void register_compaction(lw_shared_ptr<sstables::compaction_info> c) {
|
||||
_compactions.push_back(c);
|
||||
}
|
||||
|
||||
void deregister_compaction(lw_shared_ptr<sstables::compaction_stats> c) {
|
||||
void deregister_compaction(lw_shared_ptr<sstables::compaction_info> c) {
|
||||
_compactions.remove(c);
|
||||
}
|
||||
|
||||
const std::list<lw_shared_ptr<sstables::compaction_stats>>& get_compactions() const {
|
||||
const std::list<lw_shared_ptr<sstables::compaction_info>>& get_compactions() const {
|
||||
return _compactions;
|
||||
}
|
||||
|
||||
// Stops ongoing compaction of a given type.
|
||||
void stop_compaction(sstring type);
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user