repair: do not use an atomic integer

Avi asked not to use an atomic integer to produce ids for repair
operations. The existing code had another bug: It could return some
id immediately, but because our start_repair() hasn't started running
code on cpu 0 yet, the new id was not yet registered and if we were to
call repair_get_status() for this id too quickly, it could fail.

The solution for both issues is that start_repair() should return not
an int, but a future<int>: the integer id is incremented on cpu 0 (so
no atomics are needed), and then returned and the future is fulfilled.

Note that the future returned by start_repair() does not wait for the
repair to be over - just for its index to be registered and be usable
to a call to repair_get_status().

Signed-off-by: Nadav Har'El <nyh@cloudius-systems.com>
This commit is contained in:
Nadav Har'El
2015-08-31 02:24:59 +03:00
committed by Avi Kivity
parent 821d81786e
commit cc4117d6c1
3 changed files with 31 additions and 20 deletions

View File

@@ -233,14 +233,15 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});
ss::repair_async.set(r, [&ctx](const_req req) {
ss::repair_async.set(r, [&ctx](std::unique_ptr<request> req) {
// Currently, we get all the repair options encoded in a single
// "options" option, and split it to a map using the "," and ":"
// delimiters. TODO: consider if it doesn't make more sense to just
// take all the query parameters as this map and pass it to the repair
// function.
std::unordered_map<sstring, sstring> options_map;
for (auto s : split(req.get_query_param("options"), ",")) {
for (auto s : split(req->get_query_param("options"), ",")) {
auto kv = split(s, ":");
if (kv.size() != 2) {
throw httpd::bad_param_exception("malformed async repair options");
@@ -252,7 +253,10 @@ void set_storage_service(http_context& ctx, routes& r) {
// returns immediately, not waiting for the repair to finish. The user
// then has other mechanisms to track the ongoing repair's progress,
// or stop it.
return repair_start(ctx.db, validate_keyspace(ctx, req.param), options_map);
return repair_start(ctx.db, validate_keyspace(ctx, req->param),
options_map).then([] (int i) {
return make_ready_future<json::json_return_type>(i);
});
});
ss::repair_async_status.set(r, [&ctx](std::unique_ptr<request> req) {

View File

@@ -115,9 +115,6 @@ static std::vector<gms::inet_address> get_neighbors(database& db,
#endif
}
// Each repair_start() call returns a unique integer which the user can later
// use to follow the status of this repair with the repair_status() function.
static std::atomic<int> next_repair_command {0};
// The repair_tracker tracks ongoing repair operations and their progress.
// A repair which has already finished successfully is dropped from this
@@ -127,8 +124,11 @@ static std::atomic<int> next_repair_command {0};
// This object is not thread safe, and must be used by only one cpu.
static class {
private:
// Each repair_start() call returns a unique int which the user can later
// use to follow the status of this repair with repair_status().
int _next_repair_command = 0;
// Note that there are no "SUCCESSFUL" entries in the "status" map:
// Successfully-finished repairs are those with id < next_repair_command
// Successfully-finished repairs are those with id < _next_repair_command
// but aren't listed as running or failed the status map.
std::unordered_map<int, repair_status> _status;
public:
@@ -143,7 +143,7 @@ public:
}
}
repair_status get(int id) {
if (id >= next_repair_command.load(std::memory_order_relaxed)) {
if (id >= _next_repair_command) {
throw std::runtime_error(sprint("unknown repair id %d", id));
}
auto it = _status.find(id);
@@ -153,6 +153,9 @@ public:
return it->second;
}
}
int next_repair_command() {
return _next_repair_command++;
}
} repair_tracker;
// repair_start() can run on any cpu; It runs on cpu0 the function
@@ -198,11 +201,10 @@ static std::vector<query::range<dht::token>> get_local_ranges(
return get_ranges_for_endpoint(db, keyspace, utils::fb_utilities::get_broadcast_address());
}
static void do_repair_start(seastar::sharded<database>& db, sstring keyspace,
std::unordered_map<sstring, sstring> options, int id) {
logger.info("starting user-requested repair for keyspace {}", keyspace);
static int do_repair_start(seastar::sharded<database>& db, sstring keyspace,
std::unordered_map<sstring, sstring> options) {
int id = repair_tracker.next_repair_command();
logger.info("starting user-requested repair for keyspace {}, repair id {}", keyspace, id);
repair_tracker.start(id);
@@ -250,15 +252,15 @@ static void do_repair_start(seastar::sharded<database>& db, sstring keyspace,
repair_tracker.done(id, false);
});
});
return id;
}
int repair_start(seastar::sharded<database>& db, sstring keyspace,
future<int> repair_start(seastar::sharded<database>& db, sstring keyspace,
std::unordered_map<sstring, sstring> options) {
int i = next_repair_command++;
db.invoke_on(0, [i, &db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) {
do_repair_start(db, std::move(keyspace), std::move(options), i);
}); // Note we ignore the value of this future
return i;
return db.invoke_on(0, [&db, keyspace = std::move(keyspace), options = std::move(options)] (database& localdb) {
return do_repair_start(db, std::move(keyspace), std::move(options));
});
}
future<repair_status> repair_get_status(seastar::sharded<database>& db, int id) {

View File

@@ -25,7 +25,12 @@ public:
// NOTE: repair_start() can be run on any node, but starts a node-global
// operation.
int repair_start(seastar::sharded<database>& db, sstring keyspace,
// repair_start() starts the requested repair on this node. It returns an
// integer id which can be used to query the repair's status with
// repair_get_status(). The returned future<int> becomes available quickly,
// as soon as repair_get_status() can be used - it doesn't wait for the
// repair to complete.
future<int> repair_start(seastar::sharded<database>& db, sstring keyspace,
std::unordered_map<sstring, sstring> options);
// TODO: Have repair_progress contains a percentage progress estimator