diff --git a/api/api-doc/compaction_manager.json b/api/api-doc/compaction_manager.json index 109f608d63..edbd652292 100644 --- a/api/api-doc/compaction_manager.json +++ b/api/api-doc/compaction_manager.json @@ -106,7 +106,7 @@ "required":true, "allowMultiple":false, "type":"string", - "paramType":"string" + "paramType":"query" } ] } diff --git a/api/compaction_manager.cc b/api/compaction_manager.cc index 108eca3ba9..768759bfbc 100644 --- a/api/compaction_manager.cc +++ b/api/compaction_manager.cc @@ -67,11 +67,14 @@ void set_compaction_manager(http_context& ctx, routes& r) { return make_ready_future(json_void()); }); - cm::stop_compaction.set(r, [] (std::unique_ptr req) { - //TBD - // FIXME - warn(unimplemented::cause::API); - return make_ready_future(""); + cm::stop_compaction.set(r, [&ctx] (std::unique_ptr req) { + auto type = req->get_query_param("type"); + return ctx.db.invoke_on_all([type] (database& db) { + auto& cm = db.get_compaction_manager(); + cm.stop_compaction(type); + }).then([] { + return make_ready_future(json_void()); + }); }); cm::get_pending_tasks.set(r, [&ctx] (std::unique_ptr req) { diff --git a/sstables/compaction.cc b/sstables/compaction.cc index 7912f9edc1..a2e2708d8e 100644 --- a/sstables/compaction.cc +++ b/sstables/compaction.cc @@ -112,12 +112,13 @@ future<> compact_sstables(std::vector sstables, column_family& c std::vector<::mutation_reader> readers; uint64_t estimated_partitions = 0; auto ancestors = make_lw_shared>(); - auto stats = make_lw_shared(); + auto info = make_lw_shared(); auto& cm = cf.get_compaction_manager(); sstring sstable_logger_msg = "["; + info->type = (cleanup) ? compaction_type::Cleanup : compaction_type::Compaction; // register compaction_stats of starting compaction into compaction manager - cm.register_compaction(stats); + cm.register_compaction(info); assert(sstables.size() > 0); @@ -141,11 +142,11 @@ future<> compact_sstables(std::vector sstables, column_family& c // for a better estimate for the number of partitions in the merged // sstable than just adding up the lengths of individual sstables. estimated_partitions += sst->get_estimated_key_count(); - stats->total_partitions += sst->get_estimated_key_count(); + info->total_partitions += sst->get_estimated_key_count(); // Compacted sstable keeps track of its ancestors. ancestors->push_back(sst->generation()); sstable_logger_msg += sprint("%s:level=%d, ", sst->get_filename(), sst->get_sstable_level()); - stats->start_size += sst->data_size(); + info->start_size += sst->data_size(); // TODO: // Note that this is not fully correct. Since we might be merging sstables that originated on // another shard (#cpu changed), we might be comparing RP:s with differing shard ids, @@ -156,13 +157,13 @@ future<> compact_sstables(std::vector sstables, column_family& c rp = std::max(rp, sst->get_stats_metadata().position); } - uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(stats->start_size) / max_sstable_size))); + uint64_t estimated_sstables = std::max(1UL, uint64_t(ceil(double(info->start_size) / max_sstable_size))); uint64_t partitions_per_sstable = ceil(double(estimated_partitions) / estimated_sstables); sstable_logger_msg += "]"; - stats->sstables = sstables.size(); - stats->ks = schema->ks_name(); - stats->cf = schema->cf_name(); + info->sstables = sstables.size(); + info->ks = schema->ks_name(); + info->cf = schema->cf_name(); logger.info("{} {}", (!cleanup) ? "Compacting" : "Cleaning", sstable_logger_msg); class compacting_reader final : public ::mutation_reader::impl { @@ -239,10 +240,14 @@ future<> compact_sstables(std::vector sstables, column_family& c auto output_reader = make_lw_shared>(std::move(output.reader)); auto output_writer = make_lw_shared>(std::move(output.writer)); - future<> read_done = repeat([output_writer, reader = std::move(reader), stats] () mutable { - return reader().then([output_writer, stats] (auto mopt) { + future<> read_done = repeat([output_writer, reader = std::move(reader), info] () mutable { + if (info->is_stop_requested()) { + // Compaction manager will catch this exception and re-schedule the compaction. + throw std::runtime_error(sprint("Compaction for %s/%s was deliberately stopped.", info->ks, info->cf)); + } + return reader().then([output_writer, info] (auto mopt) { if (mopt) { - stats->total_keys_written++; + info->total_keys_written++; return output_writer->write(std::move(*mopt)).then([] { return make_ready_future(stop_iteration::no); }); @@ -264,9 +269,9 @@ future<> compact_sstables(std::vector sstables, column_family& c // If there is a maximum size for a sstable, it's possible that more than // one sstable will be generated for all partitions to be written. - future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] { + future<> write_done = repeat([creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] { return output_reader->read().then( - [creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, stats, partitions_per_sstable, schema] (auto mut) { + [creator, ancestors, rp, max_sstable_size, sstable_level, output_reader, info, partitions_per_sstable, schema] (auto mut) { // Check if mutation is available from the pipe for a new sstable to be written. If not, just stop writing. if (!mut) { return make_ready_future(stop_iteration::yes); @@ -283,10 +288,10 @@ future<> compact_sstables(std::vector sstables, column_family& c ::mutation_reader mutation_queue_reader = make_mutation_reader(output_reader); - return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, stats] { - return newtab->open_data().then([newtab, stats] { - stats->new_sstables.push_back(newtab); - stats->end_size += newtab->data_size(); + return newtab->write_components(std::move(mutation_queue_reader), partitions_per_sstable, schema, max_sstable_size).then([newtab, info] { + return newtab->open_data().then([newtab, info] { + info->new_sstables.push_back(newtab); + info->end_size += newtab->data_size(); return make_ready_future(stop_iteration::no); }); }); @@ -294,9 +299,9 @@ future<> compact_sstables(std::vector sstables, column_family& c }).then([output_reader] {}); // Wait for both read_done and write_done fibers to finish. - return when_all(std::move(read_done), std::move(write_done)).then([&cm, stats] (std::tuple, future<>> t) { + return when_all(std::move(read_done), std::move(write_done)).then([&cm, info] (std::tuple, future<>> t) { // deregister compaction_stats of finished compaction from compaction manager. - cm.deregister_compaction(stats); + cm.deregister_compaction(info); sstring ex; try { @@ -314,15 +319,15 @@ future<> compact_sstables(std::vector sstables, column_family& c if (ex.size()) { throw std::runtime_error(ex); } - }).then([start_time, stats, cleanup] { - double ratio = double(stats->end_size) / double(stats->start_size); + }).then([start_time, info, cleanup] { + double ratio = double(info->end_size) / double(info->start_size); auto end_time = std::chrono::steady_clock::now(); // time taken by compaction in seconds. auto duration = std::chrono::duration(end_time - start_time); - auto throughput = (double(stats->end_size) / (1024*1024)) / duration.count(); + auto throughput = (double(info->end_size) / (1024*1024)) / duration.count(); sstring new_sstables_msg; - for (auto& newtab : stats->new_sstables) { + for (auto& newtab : info->new_sstables) { new_sstables_msg += sprint("%s:level=%d, ", newtab->get_filename(), newtab->get_sstable_level()); } @@ -334,9 +339,9 @@ future<> compact_sstables(std::vector sstables, column_family& c logger.info("{} {} sstables to [{}]. {} bytes to {} (~{}% of original) in {}ms = {}MB/s. " \ "~{} total partitions merged to {}.", (!cleanup) ? "Compacted" : "Cleaned", - stats->sstables, new_sstables_msg, stats->start_size, stats->end_size, (int) (ratio * 100), + info->sstables, new_sstables_msg, info->start_size, info->end_size, (int) (ratio * 100), std::chrono::duration_cast(duration).count(), throughput, - stats->total_partitions, stats->total_keys_written); + info->total_partitions, info->total_keys_written); // If compaction is running for testing purposes, detect that there is // no query context and skip code that updates compaction history. @@ -355,8 +360,8 @@ future<> compact_sstables(std::vector sstables, column_family& c // shows how many sstables each row is merged from. This information // cannot be accessed until we make combined_reader more generic, // for example, by adding a reducer method. - return db::system_keyspace::update_compaction_history(stats->ks, stats->cf, compacted_at, - stats->start_size, stats->end_size, std::unordered_map{}); + return db::system_keyspace::update_compaction_history(info->ks, info->cf, compacted_at, + info->start_size, info->end_size, std::unordered_map{}); }); } diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 5280641c40..aab6fe81e9 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -46,7 +46,16 @@ namespace sstables { : sstables(std::move(sstables)) {} }; - struct compaction_stats { + enum class compaction_type { + Compaction = 0, + Cleanup = 1, + Validation = 2, + Scrub = 3, + Index_build = 4, + }; + + struct compaction_info { + compaction_type type; sstring ks; sstring cf; size_t sstables = 0; @@ -55,6 +64,15 @@ namespace sstables { uint64_t total_partitions = 0; uint64_t total_keys_written = 0; std::vector new_sstables; + bool stop_requested = false; + + bool is_stop_requested() const { + return stop_requested; + } + + void stop() { + stop_requested = true; + } }; // Compact a list of N sstables into M sstables. diff --git a/utils/compaction_manager.cc b/utils/compaction_manager.cc index b192d2b7e4..eabd18ff86 100644 --- a/utils/compaction_manager.cc +++ b/utils/compaction_manager.cc @@ -322,3 +322,21 @@ future<> compaction_manager::remove(column_family* cf) { }); }).then([tasks_to_stop] {}); } + +void compaction_manager::stop_compaction(sstring type) { + // TODO: this method only works for compaction of type compaction and cleanup. + // Other types are: validation, scrub, index_build. + sstables::compaction_type target_type; + if (type == "COMPACTION") { + target_type = sstables::compaction_type::Compaction; + } else if (type == "CLEANUP") { + target_type = sstables::compaction_type::Cleanup; + } else { + throw std::runtime_error(sprint("Compaction of type %s cannot be stopped by compaction manager", type.c_str())); + } + for (auto& info : _compactions) { + if (target_type == info->type) { + info->stop(); + } + } +} diff --git a/utils/compaction_manager.hh b/utils/compaction_manager.hh index 935b0a0a3f..1775e0514f 100644 --- a/utils/compaction_manager.hh +++ b/utils/compaction_manager.hh @@ -72,7 +72,7 @@ private: stats _stats; std::vector _registrations; - std::list> _compactions; + std::list> _compactions; private: void task_start(lw_shared_ptr& task); future<> task_stop(lw_shared_ptr& task); @@ -105,16 +105,19 @@ public: return _stats; } - void register_compaction(lw_shared_ptr c) { + void register_compaction(lw_shared_ptr c) { _compactions.push_back(c); } - void deregister_compaction(lw_shared_ptr c) { + void deregister_compaction(lw_shared_ptr c) { _compactions.remove(c); } - const std::list>& get_compactions() const { + const std::list>& get_compactions() const { return _compactions; } + + // Stops ongoing compaction of a given type. + void stop_compaction(sstring type); };