diff --git a/api/storage_service.cc b/api/storage_service.cc index 7389024a40..3190222667 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -229,10 +229,30 @@ void set_storage_service(http_context& ctx, routes& r) { ss::repair_async.set(r, [&ctx](std::unique_ptr req) { auto keyspace = req->param["keyspace"]; + // FIXME: the "id" and "options" parameters are mutually exclusive, + // and lead to different operations. It would have made sense to make + // these separate requests (with a different method and/or url). auto options = req->get_query_param("options"); - // FIXME: "id" is only needed in a request for information about - // a previously-started repare. In that case, "options" is not needed. auto id = req->get_query_param("id"); + + if (!id.empty()) { + return repair_get_status(ctx.db, boost::lexical_cast(id)) + .then_wrapped([] (future&& fut) { + try { + repair_status s = fut.get0(); + sstring ret; + switch(s) { + case repair_status::RUNNING: ret="RUNNING"; break; + case repair_status::SUCCESSFUL: ret="SUCCESSFUL"; break; + case repair_status::FAILED: ret="FAILED"; break; + } + return make_ready_future(ret); + } catch(std::runtime_error& e) { + throw httpd::bad_param_exception(e.what()); + } + }); + } + // Currently, we get all the repair options encoded in a single // "options" option, and split it to a map using the "," and ":" // delimiters. TODO: consider if it doesn't make more sense to just diff --git a/repair/repair.cc b/repair/repair.cc index 8e06b22336..c349d17453 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -116,9 +116,45 @@ static std::vector get_neighbors(database& db, } // Each repair_start() call returns a unique integer which the user can later -// use to follow the status of this repair. +// use to follow the status of this repair with the repair_status() function. static std::atomic next_repair_command {0}; +// The repair_tracker tracks ongoing repair operations and their progress. +// A repair which has already finished successfully is dropped from this +// table, but a failed repair will remain in the table forever so it can +// be queried about more than once (FIXME: reconsider this. But note that +// failed repairs should be rare anwyay). +// This object is not thread safe, and must be used by only one cpu. +static class { +private: + // Note that there are no "SUCCESSFUL" entries in the "status" map: + // Successfully-finished repairs are those with id < next_repair_command + // but aren't listed as running or failed the status map. + std::unordered_map status; +public: + void start(int id) { + status[id] = repair_status::RUNNING; + } + void done(int id, bool succeeded) { + if (succeeded) { + status.erase(id); + } else { + status[id] = repair_status::FAILED; + } + } + repair_status get(int id) { + if (id >= next_repair_command.load(std::memory_order_relaxed)) { + throw std::runtime_error(sprint("unknown repair id %d", id)); + } + auto it = status.find(id); + if (it == status.end()) { + return repair_status::SUCCESSFUL; + } else { + return it->second; + } + } +} repair_tracker; + // repair_start() can run on any cpu; It runs on cpu0 the function // do_repair_start(). The benefit of always running that function on the same // CPU is that it allows us to keep some state (like a list of ongoing @@ -128,9 +164,9 @@ static std::atomic next_repair_command {0}; // Repair a single range. Comparable to RepairSession in Origin // In Origin, this is composed of several "repair jobs", each with one cf, // but our streaming already works for several cfs. -static void repair_range(seastar::sharded& db, sstring keyspace, +static future<> repair_range(seastar::sharded& db, sstring keyspace, query::range range, std::vector cfs) { - auto sp = streaming::stream_plan("repair"); + auto sp = make_lw_shared("repair"); auto id = utils::UUID_gen::get_time_UUID(); auto neighbors = get_neighbors(db.local(), keyspace, range); @@ -140,12 +176,15 @@ static void repair_range(seastar::sharded& db, sstring keyspace, // request ranges from all of them and only later transfer ranges to // all of them? Otherwise, we won't necessarily fully repair the // other ndoes, just this one? What does Cassandra do here? - sp.transfer_ranges(peer, peer, keyspace, {range}, cfs); - sp.request_ranges(peer, peer, keyspace, {range}, cfs); - sp.execute().discard_result().handle_exception([id] (auto ep) { - logger.error("repair session #{} stream failed: {}", id, ep); - }); + sp->transfer_ranges(peer, peer, keyspace, {range}, cfs); + sp->request_ranges(peer, peer, keyspace, {range}, cfs); } + return sp->execute().discard_result().then([sp, id] { + logger.info("repair session #{} successful", id); + }).handle_exception([id] (auto ep) { + logger.error("repair session #{} stream failed: {}", id, ep); + return make_exception_future(std::runtime_error("repair_range failed")); + }); } static std::vector> get_ranges_for_endpoint( @@ -161,10 +200,12 @@ static std::vector> get_local_ranges( static void do_repair_start(seastar::sharded& db, sstring keyspace, - std::unordered_map options) { + std::unordered_map options, int id) { logger.info("starting user-requested repair for keyspace {}", keyspace); + repair_tracker.start(id); + // If the "ranges" option is not explicitly specified, we repair all the // local ranges (the token ranges for which this node holds a replica of). // Each of these ranges may have a different set of replicas, so the @@ -192,16 +233,47 @@ static void do_repair_start(seastar::sharded& db, sstring keyspace, // FIXME: let the cfs be overriden by an option std::vector cfs = list_column_families(db.local(), keyspace); +#if 1 + // repair all the ranges in parallel + auto done = make_lw_shared(0); + auto success = make_lw_shared(true); for (auto range : ranges) { - repair_range(db, keyspace, range, cfs); + repair_range(db, keyspace, range, cfs). + handle_exception([success] (std::exception_ptr eptr) + { *success = false; }). + finally([done] { done->signal(); }); } + done->wait(ranges.size()).then([done, id, success] { + logger.info("repair {} complete, success={}", id, success); + repair_tracker.done(id, *success); + }); +#else + // repair all the ranges in sequence + do_with(std::move(ranges), [&db, keyspace, cfs, id] (auto& ranges) { + return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { + return repair_range(db, keyspace, range, cfs); + }).then([id] { + logger.info("repair {} completed sucessfully", id); + repair_tracker.done(id, true); + }).handle_exception([id] (std::exception_ptr eptr) { + logger.info("repair {} failed", id); + repair_tracker.done(id, false); + }); + }); +#endif } int repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options) { int i = next_repair_command++; - db.invoke_on(0, [&db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) { - do_repair_start(db, std::move(keyspace), std::move(options)); + db.invoke_on(0, [i, &db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) { + do_repair_start(db, std::move(keyspace), std::move(options), i); }); // Note we ignore the value of this future return i; } + +future repair_get_status(seastar::sharded& db, int id) { + return db.invoke_on(0, [id] (database& localdb) { + return repair_tracker.get(id); + }); +} diff --git a/repair/repair.hh b/repair/repair.hh index afe4f276a0..ccc843290a 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -27,3 +27,11 @@ public: // operation. int repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options); + +// TODO: Have repair_progress contains a percentage progress estimator +// instead of just "RUNNING". +enum class repair_status { RUNNING, SUCCESSFUL, FAILED }; + +// repair_get_status() returns a future because it needs to run code on a +// different CPU (cpu 0) and that might be a deferring operation. +future repair_get_status(seastar::sharded& db, int id);