Merge "Fix two corner cases in snapshots API" from Pavel

"
There seem to be two problems with handling snapshot API -- one
on start and the other one on stop. Here's the set that addresses
both.

The fix moved snapshot API registration later in time that required
Amnon's ACK. Now we have it :) so -- the rebase and resend.

Tests: unit(dev), start-stop
"

* 'br-snapshot-bugs-2' of https://github.com/xemul/scylla:
  snapshot: Pass requests through gate
  api: Register snapshot API later
  api: Unwrap wrap_ks_cf
This commit is contained in:
Avi Kivity
2020-01-28 18:18:48 +02:00
7 changed files with 125 additions and 108 deletions

View File

@@ -90,6 +90,10 @@ future<> set_server_storage_service(http_context& ctx) {
return register_api(ctx, "storage_service", "The storage service API", set_storage_service);
}
future<> set_server_snapshot(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { set_snapshot(ctx, r); });
}
future<> set_server_snitch(http_context& ctx) {
return register_api(ctx, "endpoint_snitch_info", "The endpoint snitch info API", set_endpoint_snitch);
}

View File

@@ -44,6 +44,7 @@ struct http_context {
future<> set_server_init(http_context& ctx);
future<> set_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx);
future<> set_server_snapshot(http_context& ctx);
future<> set_server_gossip(http_context& ctx);
future<> set_server_load_sstable(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx);

View File

@@ -78,20 +78,20 @@ static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
return res;
}
void set_storage_service(http_context& ctx, routes& r) {
using ks_cf_func = std::function<future<json::json_return_type>(std::unique_ptr<request>, sstring, std::vector<sstring>)>;
using ks_cf_func = std::function<future<json::json_return_type>(http_context&, std::unique_ptr<request>, sstring, std::vector<sstring>)>;
auto wrap_ks_cf = [&ctx](ks_cf_func f) {
return [&ctx, f = std::move(f)](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
if (column_families.empty()) {
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
return f(std::move(req), std::move(keyspace), std::move(column_families));
};
static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
return [&ctx, f = std::move(f)](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
if (column_families.empty()) {
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
return f(ctx, std::move(req), std::move(keyspace), std::move(column_families));
};
}
void set_storage_service(http_context& ctx, routes& r) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
@@ -222,64 +222,6 @@ void set_storage_service(http_context& ctx, routes& r) {
req.get_query_param("key")));
});
ss::get_snapshot_details.set(r, [](std::unique_ptr<request> req) {
return service::get_local_storage_service().get_snapshot_details().then([] (auto result) {
std::vector<ss::snapshots> res;
for (auto& map: result) {
ss::snapshots all_snapshots;
all_snapshots.key = map.first;
std::vector<ss::snapshot> snapshot;
for (auto& cf: map.second) {
ss::snapshot s;
s.ks = cf.ks;
s.cf = cf.cf;
s.live = cf.live;
s.total = cf.total;
snapshot.push_back(std::move(s));
}
all_snapshots.value = std::move(snapshot);
res.push_back(std::move(all_snapshots));
}
return make_ready_future<json::json_return_type>(std::move(res));
});
});
ss::take_snapshot.set(r, [](std::unique_ptr<request> req) {
auto tag = req->get_query_param("tag");
auto column_family = req->get_query_param("cf");
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
auto resp = make_ready_future<>();
if (column_family.empty()) {
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
} else {
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag);
}
return resp.then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
ss::del_snapshot.set(r, [](std::unique_ptr<request> req) {
auto tag = req->get_query_param("tag");
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
ss::true_snapshots_size.set(r, [](std::unique_ptr<request> req) {
return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) {
return make_ready_future<json::json_return_type>(size);
});
});
ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
@@ -326,32 +268,7 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});
ss::scrub.set(r, wrap_ks_cf([&ctx](std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
// TODO: respect this
auto skip_corrupted = req->get_query_param("skip_corrupted");
auto f = make_ready_future<>();
if (!req_param<bool>(*req, "disable_snapshot", false)) {
auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count());
f = parallel_for_each(column_families, [keyspace, tag](sstring cf) {
return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag);
});
}
return f.then([&ctx, keyspace, column_families] {
return ctx.db.invoke_on_all([=] (database& db) {
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_scrub(&cf);
});
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
});
}));
ss::upgrade_sstables.set(r, wrap_ks_cf([&ctx](std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
bool exclude_current_version = req_param<bool>(*req, "exclude_current_version", false);
return ctx.db.invoke_on_all([=] (database& db) {
@@ -1037,4 +954,89 @@ void set_storage_service(http_context& ctx, routes& r) {
}
void set_snapshot(http_context& ctx, routes& r) {
ss::get_snapshot_details.set(r, [](std::unique_ptr<request> req) {
return service::get_local_storage_service().get_snapshot_details().then([] (auto result) {
std::vector<ss::snapshots> res;
for (auto& map: result) {
ss::snapshots all_snapshots;
all_snapshots.key = map.first;
std::vector<ss::snapshot> snapshot;
for (auto& cf: map.second) {
ss::snapshot s;
s.ks = cf.ks;
s.cf = cf.cf;
s.live = cf.live;
s.total = cf.total;
snapshot.push_back(std::move(s));
}
all_snapshots.value = std::move(snapshot);
res.push_back(std::move(all_snapshots));
}
return make_ready_future<json::json_return_type>(std::move(res));
});
});
ss::take_snapshot.set(r, [](std::unique_ptr<request> req) {
auto tag = req->get_query_param("tag");
auto column_family = req->get_query_param("cf");
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
auto resp = make_ready_future<>();
if (column_family.empty()) {
resp = service::get_local_storage_service().take_snapshot(tag, keynames);
} else {
if (keynames.size() > 1) {
throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family");
}
resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag);
}
return resp.then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
ss::del_snapshot.set(r, [](std::unique_ptr<request> req) {
auto tag = req->get_query_param("tag");
std::vector<sstring> keynames = split(req->get_query_param("kn"), ",");
return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
ss::true_snapshots_size.set(r, [](std::unique_ptr<request> req) {
return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) {
return make_ready_future<json::json_return_type>(size);
});
});
ss::scrub.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<request> req, sstring keyspace, std::vector<sstring> column_families) {
// TODO: respect this
auto skip_corrupted = req->get_query_param("skip_corrupted");
auto f = make_ready_future<>();
if (!req_param<bool>(*req, "disable_snapshot", false)) {
auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count());
f = parallel_for_each(column_families, [keyspace, tag](sstring cf) {
return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag);
});
}
return f.then([&ctx, keyspace, column_families] {
return ctx.db.invoke_on_all([=] (database& db) {
return do_for_each(column_families, [=, &db](sstring cfname) {
auto& cm = db.get_compaction_manager();
auto& cf = db.find_column_family(keyspace, cfname);
return cm.perform_sstable_scrub(&cf);
});
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
});
}));
}
}

View File

@@ -26,5 +26,6 @@
namespace api {
void set_storage_service(http_context& ctx, routes& r);
void set_snapshot(http_context& ctx, routes& r);
}

View File

@@ -950,6 +950,11 @@ int main(int ac, char** av) {
api::set_server_messaging_service(ctx).get();
api::set_server_storage_service(ctx).get();
ss.init_server_without_the_messaging_service_part().get();
api::set_server_snapshot(ctx).get();
auto stop_snapshots = defer_verbose_shutdown("snapshots", [] {
service::get_storage_service().invoke_on_all(&service::storage_service::snapshots_close).get();
});
supervisor::notify("starting batchlog manager");
db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) {
return b.start();

View File

@@ -1998,15 +1998,21 @@ future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) {
template <typename Func>
std::result_of_t<Func()> storage_service::run_snapshot_modify_operation(Func&& f) {
return smp::submit_to(0, [f = std::move(f)] () mutable {
return with_lock(get_local_storage_service()._snapshot_lock.for_write(), std::move(f));
auto& ss = get_storage_service();
return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () {
return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable {
return with_lock(ss._snapshot_lock.for_write(), std::move(f));
});
});
}
template <typename Func>
std::result_of_t<Func()> storage_service::run_snapshot_list_operation(Func&& f) {
return smp::submit_to(0, [f = std::move(f)] () mutable {
return with_lock(get_local_storage_service()._snapshot_lock.for_read(), std::move(f));
auto& ss = get_storage_service();
return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () {
return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable {
return with_lock(ss._snapshot_lock.for_read(), std::move(f));
});
});
}
@@ -2020,11 +2026,6 @@ future<> storage_service::take_snapshot(sstring tag, std::vector<sstring> keyspa
};
return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), this] {
auto mode = get_local_storage_service()._operation_mode;
if (mode == storage_service::mode::JOINING) {
throw std::runtime_error("Cannot snapshot until bootstrap completes");
}
return parallel_for_each(keyspace_names, [tag, this] (auto& ks_name) {
return check_snapshot_not_exist(_db.local(), ks_name, tag);
}).then([this, tag, keyspace_names] {
@@ -2058,10 +2059,6 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c
}
return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] {
auto mode = get_local_storage_service()._operation_mode;
if (mode == storage_service::mode::JOINING) {
throw std::runtime_error("Cannot snapshot until bootstrap completes");
}
return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] {
return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) {
auto& cf = db.find_column_family(ks_name, cf_name);

View File

@@ -52,6 +52,7 @@
#include "gms/application_state.hh"
#include "db/system_distributed_keyspace.hh"
#include <seastar/core/semaphore.hh>
#include <seastar/core/gate.hh>
#include "utils/fb_utilities.hh"
#include "utils/serialized_action.hh"
#include "database_fwd.hh"
@@ -265,6 +266,7 @@ private:
bool _joined = false;
seastar::rwlock _snapshot_lock;
seastar::gate _snapshot_ops;
template <typename Func>
static std::result_of_t<Func()> run_snapshot_modify_operation(Func&&);
@@ -273,6 +275,11 @@ private:
static std::result_of_t<Func()> run_snapshot_list_operation(Func&&);
public:
enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED };
future<> snapshots_close() {
return _snapshot_ops.close();
}
private:
mode _operation_mode = mode::STARTING;
friend std::ostream& operator<<(std::ostream& os, const mode& mode);