From 5a02eeaba9ae341bd2c59a45eaf23df50bb415ac Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Sun, 16 Aug 2015 01:36:49 +0300 Subject: [PATCH] v2: repair: track ongoing repairs [in v2: 1. Fixed a few small bugs. 2. Added rudementary support parallel/sequential repair. 3. Verified that code works correctly with Asias's fix to streaming] This patch adds the capability to track repair operations which we have started, and check whether they are still running or completed (successfully or unsuccessfully). As before one starts a repair with the REST api: curl -X GET --header "Content-Type: application/json" --header "Accept: application/json" "http://127.0.0.1:10000/storage_service/repair_async/try1" where "try1" is the name of the keyspace. This returns a repair id - a small integer starting with 0. This patch adds support for similar request to *query* the status of a previously started repair, by adding the "id=..." option to the query, which enquires about the status of the repair with this id: For example., curl -i -X GET --header "Content-Type: application/json" --header "Accept: application/json" "http://127.0.0.1:10000/storage_service/repair_async/try1?id=0" gets the current status of this repair 0. This status can be RUNNING, SUCCESSFUL or FAILED, or a HTTP 400 "unknown repair id ..." in case an invalid id is passed (not the id of any real repair that was previously started). This patch also adds two alternative code-paths in the main repair flow do_repair_start(): One where each range is repaired one after another, and one where all the ranges are repaired in parallel. At the moment, the enabled code is the parallel version, just as before this patch. But the will also be useful for implementing the "parallel" vs "sequential" repair options of Cassandra. Note that if you try to use repair, you are likely to run into a bug in the streaming code which results in Scylla either crashing or a repair hanging (never realising it finished). Asias already has a fix this this bug, and will hopefully publish it soon, but it is unrelated to the repair code so I think this patch can independently be committed. Signed-off-by: Nadav Har'El --- api/storage_service.cc | 24 ++++++++++- repair/repair.cc | 96 ++++++++++++++++++++++++++++++++++++------ repair/repair.hh | 8 ++++ 3 files changed, 114 insertions(+), 14 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 7389024a40..3190222667 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -229,10 +229,30 @@ void set_storage_service(http_context& ctx, routes& r) { ss::repair_async.set(r, [&ctx](std::unique_ptr req) { auto keyspace = req->param["keyspace"]; + // FIXME: the "id" and "options" parameters are mutually exclusive, + // and lead to different operations. It would have made sense to make + // these separate requests (with a different method and/or url). auto options = req->get_query_param("options"); - // FIXME: "id" is only needed in a request for information about - // a previously-started repare. In that case, "options" is not needed. auto id = req->get_query_param("id"); + + if (!id.empty()) { + return repair_get_status(ctx.db, boost::lexical_cast(id)) + .then_wrapped([] (future&& fut) { + try { + repair_status s = fut.get0(); + sstring ret; + switch(s) { + case repair_status::RUNNING: ret="RUNNING"; break; + case repair_status::SUCCESSFUL: ret="SUCCESSFUL"; break; + case repair_status::FAILED: ret="FAILED"; break; + } + return make_ready_future(ret); + } catch(std::runtime_error& e) { + throw httpd::bad_param_exception(e.what()); + } + }); + } + // Currently, we get all the repair options encoded in a single // "options" option, and split it to a map using the "," and ":" // delimiters. TODO: consider if it doesn't make more sense to just diff --git a/repair/repair.cc b/repair/repair.cc index 8e06b22336..c349d17453 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -116,9 +116,45 @@ static std::vector get_neighbors(database& db, } // Each repair_start() call returns a unique integer which the user can later -// use to follow the status of this repair. +// use to follow the status of this repair with the repair_status() function. static std::atomic next_repair_command {0}; +// The repair_tracker tracks ongoing repair operations and their progress. +// A repair which has already finished successfully is dropped from this +// table, but a failed repair will remain in the table forever so it can +// be queried about more than once (FIXME: reconsider this. But note that +// failed repairs should be rare anwyay). +// This object is not thread safe, and must be used by only one cpu. +static class { +private: + // Note that there are no "SUCCESSFUL" entries in the "status" map: + // Successfully-finished repairs are those with id < next_repair_command + // but aren't listed as running or failed the status map. + std::unordered_map status; +public: + void start(int id) { + status[id] = repair_status::RUNNING; + } + void done(int id, bool succeeded) { + if (succeeded) { + status.erase(id); + } else { + status[id] = repair_status::FAILED; + } + } + repair_status get(int id) { + if (id >= next_repair_command.load(std::memory_order_relaxed)) { + throw std::runtime_error(sprint("unknown repair id %d", id)); + } + auto it = status.find(id); + if (it == status.end()) { + return repair_status::SUCCESSFUL; + } else { + return it->second; + } + } +} repair_tracker; + // repair_start() can run on any cpu; It runs on cpu0 the function // do_repair_start(). The benefit of always running that function on the same // CPU is that it allows us to keep some state (like a list of ongoing @@ -128,9 +164,9 @@ static std::atomic next_repair_command {0}; // Repair a single range. Comparable to RepairSession in Origin // In Origin, this is composed of several "repair jobs", each with one cf, // but our streaming already works for several cfs. -static void repair_range(seastar::sharded& db, sstring keyspace, +static future<> repair_range(seastar::sharded& db, sstring keyspace, query::range range, std::vector cfs) { - auto sp = streaming::stream_plan("repair"); + auto sp = make_lw_shared("repair"); auto id = utils::UUID_gen::get_time_UUID(); auto neighbors = get_neighbors(db.local(), keyspace, range); @@ -140,12 +176,15 @@ static void repair_range(seastar::sharded& db, sstring keyspace, // request ranges from all of them and only later transfer ranges to // all of them? Otherwise, we won't necessarily fully repair the // other ndoes, just this one? What does Cassandra do here? - sp.transfer_ranges(peer, peer, keyspace, {range}, cfs); - sp.request_ranges(peer, peer, keyspace, {range}, cfs); - sp.execute().discard_result().handle_exception([id] (auto ep) { - logger.error("repair session #{} stream failed: {}", id, ep); - }); + sp->transfer_ranges(peer, peer, keyspace, {range}, cfs); + sp->request_ranges(peer, peer, keyspace, {range}, cfs); } + return sp->execute().discard_result().then([sp, id] { + logger.info("repair session #{} successful", id); + }).handle_exception([id] (auto ep) { + logger.error("repair session #{} stream failed: {}", id, ep); + return make_exception_future(std::runtime_error("repair_range failed")); + }); } static std::vector> get_ranges_for_endpoint( @@ -161,10 +200,12 @@ static std::vector> get_local_ranges( static void do_repair_start(seastar::sharded& db, sstring keyspace, - std::unordered_map options) { + std::unordered_map options, int id) { logger.info("starting user-requested repair for keyspace {}", keyspace); + repair_tracker.start(id); + // If the "ranges" option is not explicitly specified, we repair all the // local ranges (the token ranges for which this node holds a replica of). // Each of these ranges may have a different set of replicas, so the @@ -192,16 +233,47 @@ static void do_repair_start(seastar::sharded& db, sstring keyspace, // FIXME: let the cfs be overriden by an option std::vector cfs = list_column_families(db.local(), keyspace); +#if 1 + // repair all the ranges in parallel + auto done = make_lw_shared(0); + auto success = make_lw_shared(true); for (auto range : ranges) { - repair_range(db, keyspace, range, cfs); + repair_range(db, keyspace, range, cfs). + handle_exception([success] (std::exception_ptr eptr) + { *success = false; }). + finally([done] { done->signal(); }); } + done->wait(ranges.size()).then([done, id, success] { + logger.info("repair {} complete, success={}", id, success); + repair_tracker.done(id, *success); + }); +#else + // repair all the ranges in sequence + do_with(std::move(ranges), [&db, keyspace, cfs, id] (auto& ranges) { + return do_for_each(ranges.begin(), ranges.end(), [&db, keyspace, cfs, id] (auto&& range) { + return repair_range(db, keyspace, range, cfs); + }).then([id] { + logger.info("repair {} completed sucessfully", id); + repair_tracker.done(id, true); + }).handle_exception([id] (std::exception_ptr eptr) { + logger.info("repair {} failed", id); + repair_tracker.done(id, false); + }); + }); +#endif } int repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options) { int i = next_repair_command++; - db.invoke_on(0, [&db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) { - do_repair_start(db, std::move(keyspace), std::move(options)); + db.invoke_on(0, [i, &db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) { + do_repair_start(db, std::move(keyspace), std::move(options), i); }); // Note we ignore the value of this future return i; } + +future repair_get_status(seastar::sharded& db, int id) { + return db.invoke_on(0, [id] (database& localdb) { + return repair_tracker.get(id); + }); +} diff --git a/repair/repair.hh b/repair/repair.hh index afe4f276a0..ccc843290a 100644 --- a/repair/repair.hh +++ b/repair/repair.hh @@ -27,3 +27,11 @@ public: // operation. int repair_start(seastar::sharded& db, sstring keyspace, std::unordered_map options); + +// TODO: Have repair_progress contains a percentage progress estimator +// instead of just "RUNNING". +enum class repair_status { RUNNING, SUCCESSFUL, FAILED }; + +// repair_get_status() returns a future because it needs to run code on a +// different CPU (cpu 0) and that might be a deferring operation. +future repair_get_status(seastar::sharded& db, int id);