Merge "Add support to stop ongoing compaction" from Raphael

"stop compaction is about temporarily interrupting all ongoing compaction
 of a given type.
 That will also be needed for 'nodetool stop <compaction_type>'.

 The test was about starting scylla, stressing it, stopping compaction using
 the API and checking that scylla was able to recover.

 Scylla will print a message as follow for each compaction that was stopped:
 ERROR [shard 0] compaction_manager - compaction failed: read exception:
 std::runtime_error (Compaction for keyspace1/standard1 was deliberately stopped.)
 INFO  [shard 0] compaction_manager - compaction task handler sleeping for 20 seconds"
This commit is contained in:
Pekka Enberg
2016-01-21 18:34:10 +02:00
6 changed files with 85 additions and 38 deletions

View File

@@ -106,7 +106,7 @@
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"string"
"paramType":"query"
}
]
}

View File

@@ -67,11 +67,14 @@ void set_compaction_manager(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(json_void());
});
cm::stop_compaction.set(r, [] (std::unique_ptr<request> req) {
//TBD
// FIXME
warn(unimplemented::cause::API);
return make_ready_future<json::json_return_type>("");
cm::stop_compaction.set(r, [&ctx] (std::unique_ptr<request> req) {
auto type = req->get_query_param("type");
return ctx.db.invoke_on_all([type] (database& db) {
auto& cm = db.get_compaction_manager();
cm.stop_compaction(type);
}).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
cm::get_pending_tasks.set(r, [&ctx] (std::unique_ptr<request> req) {

View File

@@ -112,12 +112,13 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
std::vector<::mutation_reader> readers;
uint64_t estimated_partitions = 0;
auto ancestors = make_lw_shared<std::vector<unsigned long>>();
auto stats = make_lw_shared<compaction_stats>();
auto info = make_lw_shared<compaction_info>();
auto& cm = cf.get_compaction_manager();
sstring sstable_logger_msg = "[";
info->type = (cleanup) ? compaction_type::Cleanup : compaction_type::Compaction;
// register compaction_stats of starting compaction into compaction manager
cm.register_compaction(stats);
cm.register_compaction(info);
assert(sstables.size() > 0);
@@ -141,11 +142,11 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
// for a better estimate for the number of partitions in the merged
// sstable than just adding up the lengths of individual sstables.
estimated_partitions += sst->get_estimated_key_count();
stats->total_partitions += sst->get_estimated_key_count();
info->total_partitions += sst->get_estimated_key_count();
// Compacted sstable keeps track of its ancestors.
ancestors->push_back(sst->generation());
sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), sst->get_sstable_level());
stats->start_size += sst->data_size();
info->start_size += sst->data_size();
// TODO:
// Note that this is not fully correct. Since we might be merging sstables that originated on
// another shard (#cpu changed), we might be comparing RP:s with differing shard ids,
@@ -156,13 +157,13 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
rp = std::max(rp, sst->get_stats_metadata().position);
}
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(stats->start_size) / max_sstable_size)));
uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(info->start_size) / max_sstable_size)));
uint64_t partitions_per_sstable = ceil(double(estimated_partitions) / estimated_sstables);
sstable_logger_msg += "]";
stats->sstables = sstables.size();
stats->ks = schema->ks_name();
stats->cf = schema->cf_name();
info->sstables = sstables.size();
info->ks = schema->ks_name();
info->cf = schema->cf_name();
logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg);
class compacting_reader final : public ::mutation_reader::impl {
@@ -239,10 +240,14 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
auto output_reader = make_lw_shared<seastar::pipe_reader<mutation>>(std::move(output.reader));
auto output_writer = make_lw_shared<seastar::pipe_writer<mutation>>(std::move(output.writer));
future<> read_done = repeat([output_writer, reader = std::move(reader), stats] () mutable {
return reader().then([output_writer, stats] (auto mopt) {
future<> read_done = repeat([output_writer, reader = std::move(reader), info] () mutable {
if (info->is_stop_requested()) {
// Compaction manager will catch this exception and re-schedule the compaction.
throw std::runtime_error(sprint("Compaction for %s/%s was deliberately stopped.", info->ks, info->cf));
}
return reader().then([output_writer, info] (auto mopt) {
if (mopt) {
stats->total_keys_written++;
info->total_keys_written++;
return output_writer->write(std::move(*mopt)).then([] {
return make_ready_future<stop_iteration>(stop_iteration::no);
});
@@ -264,9 +269,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
// If there is a maximum size for a sstable, it's possible that more than
// one sstable will be generated for all partitions to be written.
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] {
future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] {
return output_reader->read().then(
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] (auto mut) {
[creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] (auto mut) {
// Check if mutation is available from the pipe for a new sstable to be written. If not, just stop writing.
if (!mut) {
return make_ready_future<stop_iteration>(stop_iteration::yes);
@@ -283,10 +288,10 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
::mutation_reader mutation_queue_reader = make_mutation_reader<queue_reader>(output_reader);
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, stats] {
return newtab->open_data().then([newtab, stats] {
stats->new_sstables.push_back(newtab);
stats->end_size += newtab->data_size();
return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, info] {
return newtab->open_data().then([newtab, info] {
info->new_sstables.push_back(newtab);
info->end_size += newtab->data_size();
return make_ready_future<stop_iteration>(stop_iteration::no);
});
});
@@ -294,9 +299,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
}).then([output_reader] {});
// Wait for both read_done and write_done fibers to finish.
return when_all(std::move(read_done), std::move(write_done)).then([&cm, stats] (std::tuple<future<>, future<>> t) {
return when_all(std::move(read_done), std::move(write_done)).then([&cm, info] (std::tuple<future<>, future<>> t) {
// deregister compaction_stats of finished compaction from compaction manager.
cm.deregister_compaction(stats);
cm.deregister_compaction(info);
sstring ex;
try {
@@ -314,15 +319,15 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
if (ex.size()) {
throw std::runtime_error(ex);
}
}).then([start_time, stats, cleanup] {
double ratio = double(stats->end_size) / double(stats->start_size);
}).then([start_time, info, cleanup] {
double ratio = double(info->end_size) / double(info->start_size);
auto end_time = std::chrono::steady_clock::now();
// time taken by compaction in seconds.
auto duration = std::chrono::duration<float>(end_time - start_time);
auto throughput = (double(stats->end_size) / (1024*1024)) / duration.count();
auto throughput = (double(info->end_size) / (1024*1024)) / duration.count();
sstring new_sstables_msg;
for (auto& newtab : stats->new_sstables) {
for (auto& newtab : info->new_sstables) {
new_sstables_msg += sprint("%s:level=%d, ", newtab->get_filename(), newtab->get_sstable_level());
}
@@ -334,9 +339,9 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
logger.info("{} {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \
"~{} total partitions merged to {}.",
(!cleanup) ? "Compacted" : "Cleaned",
stats->sstables, new_sstables_msg, stats->start_size, stats->end_size, (int) (ratio * 100),
info->sstables, new_sstables_msg, info->start_size, info->end_size, (int) (ratio * 100),
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(), throughput,
stats->total_partitions, stats->total_keys_written);
info->total_partitions, info->total_keys_written);
// If compaction is running for testing purposes, detect that there is
// no query context and skip code that updates compaction history.
@@ -355,8 +360,8 @@ future<> compact_sstables(std::vector<shared_sstable> sstables, column_family& c
// shows how many sstables each row is merged from. This information
// cannot be accessed until we make combined_reader more generic,
// for example, by adding a reducer method.
return db::system_keyspace::update_compaction_history(stats->ks, stats->cf, compacted_at,
stats->start_size, stats->end_size, std::unordered_map<int32_t, int64_t>{});
return db::system_keyspace::update_compaction_history(info->ks, info->cf, compacted_at,
info->start_size, info->end_size, std::unordered_map<int32_t, int64_t>{});
});
}

View File

@@ -46,7 +46,16 @@ namespace sstables {
: sstables(std::move(sstables)) {}
};
struct compaction_stats {
enum class compaction_type {
Compaction = 0,
Cleanup = 1,
Validation = 2,
Scrub = 3,
Index_build = 4,
};
struct compaction_info {
compaction_type type;
sstring ks;
sstring cf;
size_t sstables = 0;
@@ -55,6 +64,15 @@ namespace sstables {
uint64_t total_partitions = 0;
uint64_t total_keys_written = 0;
std::vector<shared_sstable> new_sstables;
bool stop_requested = false;
bool is_stop_requested() const {
return stop_requested;
}
void stop() {
stop_requested = true;
}
};
// Compact a list of N sstables into M sstables.

View File

@@ -322,3 +322,21 @@ future<> compaction_manager::remove(column_family* cf) {
});
}).then([tasks_to_stop] {});
}
void compaction_manager::stop_compaction(sstring type) {
// TODO: this method only works for compaction of type compaction and cleanup.
// Other types are: validation, scrub, index_build.
sstables::compaction_type target_type;
if (type == "COMPACTION") {
target_type = sstables::compaction_type::Compaction;
} else if (type == "CLEANUP") {
target_type = sstables::compaction_type::Cleanup;
} else {
throw std::runtime_error(sprint("Compaction of type %s cannot be stopped by compaction manager", type.c_str()));
}
for (auto& info : _compactions) {
if (target_type == info->type) {
info->stop();
}
}
}

View File

@@ -72,7 +72,7 @@ private:
stats _stats;
std::vector<scollectd::registration> _registrations;
std::list<lw_shared_ptr<sstables::compaction_stats>> _compactions;
std::list<lw_shared_ptr<sstables::compaction_info>> _compactions;
private:
void task_start(lw_shared_ptr<task>& task);
future<> task_stop(lw_shared_ptr<task>& task);
@@ -105,16 +105,19 @@ public:
return _stats;
}
void register_compaction(lw_shared_ptr<sstables::compaction_stats> c) {
void register_compaction(lw_shared_ptr<sstables::compaction_info> c) {
_compactions.push_back(c);
}
void deregister_compaction(lw_shared_ptr<sstables::compaction_stats> c) {
void deregister_compaction(lw_shared_ptr<sstables::compaction_info> c) {
_compactions.remove(c);
}
const std::list<lw_shared_ptr<sstables::compaction_stats>>& get_compactions() const {
const std::list<lw_shared_ptr<sstables::compaction_info>>& get_compactions() const {
return _compactions;
}
// Stops ongoing compaction of a given type.
void stop_compaction(sstring type);
};