diff --git a/api/api.cc b/api/api.cc index 07df2b420a..98dd7b67aa 100644 --- a/api/api.cc +++ b/api/api.cc @@ -90,6 +90,10 @@ future<> set_server_storage_service(http_context& ctx) { return register_api(ctx, "storage_service", "The storage service API", set_storage_service); } +future<> set_server_snapshot(http_context& ctx) { + return ctx.http_server.set_routes([&ctx] (routes& r) { set_snapshot(ctx, r); }); +} + future<> set_server_snitch(http_context& ctx) { return register_api(ctx, "endpoint_snitch_info", "The endpoint snitch info API", set_endpoint_snitch); } diff --git a/api/api_init.hh b/api/api_init.hh index 09a9d360ba..ca837b9c72 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -44,6 +44,7 @@ struct http_context { future<> set_server_init(http_context& ctx); future<> set_server_snitch(http_context& ctx); future<> set_server_storage_service(http_context& ctx); +future<> set_server_snapshot(http_context& ctx); future<> set_server_gossip(http_context& ctx); future<> set_server_load_sstable(http_context& ctx); future<> set_server_messaging_service(http_context& ctx); diff --git a/api/storage_service.cc b/api/storage_service.cc index 5f68a94665..78fdf80223 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -78,20 +78,20 @@ static std::vector describe_ring(const sstring& keyspace) { return res; } -void set_storage_service(http_context& ctx, routes& r) { - using ks_cf_func = std::function(std::unique_ptr, sstring, std::vector)>; +using ks_cf_func = std::function(http_context&, std::unique_ptr, sstring, std::vector)>; - auto wrap_ks_cf = [&ctx](ks_cf_func f) { - return [&ctx, f = std::move(f)](std::unique_ptr req) { - auto keyspace = validate_keyspace(ctx, req->param); - auto column_families = split_cf(req->get_query_param("cf")); - if (column_families.empty()) { - column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); - } - return f(std::move(req), std::move(keyspace), std::move(column_families)); - }; +static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { + return [&ctx, f = std::move(f)](std::unique_ptr req) { + auto keyspace = validate_keyspace(ctx, req->param); + auto column_families = split_cf(req->get_query_param("cf")); + if (column_families.empty()) { + column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); + } + return f(ctx, std::move(req), std::move(keyspace), std::move(column_families)); }; +} +void set_storage_service(http_context& ctx, routes& r) { ss::local_hostid.set(r, [](std::unique_ptr req) { return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) { return make_ready_future(id.to_sstring()); @@ -222,64 +222,6 @@ void set_storage_service(http_context& ctx, routes& r) { req.get_query_param("key"))); }); - ss::get_snapshot_details.set(r, [](std::unique_ptr req) { - return service::get_local_storage_service().get_snapshot_details().then([] (auto result) { - std::vector res; - for (auto& map: result) { - ss::snapshots all_snapshots; - all_snapshots.key = map.first; - - std::vector snapshot; - for (auto& cf: map.second) { - ss::snapshot s; - s.ks = cf.ks; - s.cf = cf.cf; - s.live = cf.live; - s.total = cf.total; - snapshot.push_back(std::move(s)); - } - all_snapshots.value = std::move(snapshot); - res.push_back(std::move(all_snapshots)); - } - return make_ready_future(std::move(res)); - }); - }); - - ss::take_snapshot.set(r, [](std::unique_ptr req) { - auto tag = req->get_query_param("tag"); - auto column_family = req->get_query_param("cf"); - - std::vector keynames = split(req->get_query_param("kn"), ","); - - auto resp = make_ready_future<>(); - if (column_family.empty()) { - resp = service::get_local_storage_service().take_snapshot(tag, keynames); - } else { - if (keynames.size() > 1) { - throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family"); - } - resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag); - } - return resp.then([] { - return make_ready_future(json_void()); - }); - }); - - ss::del_snapshot.set(r, [](std::unique_ptr req) { - auto tag = req->get_query_param("tag"); - - std::vector keynames = split(req->get_query_param("kn"), ","); - return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] { - return make_ready_future(json_void()); - }); - }); - - ss::true_snapshots_size.set(r, [](std::unique_ptr req) { - return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) { - return make_ready_future(size); - }); - }); - ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto column_families = split_cf(req->get_query_param("cf")); @@ -326,32 +268,7 @@ void set_storage_service(http_context& ctx, routes& r) { }); }); - ss::scrub.set(r, wrap_ks_cf([&ctx](std::unique_ptr req, sstring keyspace, std::vector column_families) { - // TODO: respect this - auto skip_corrupted = req->get_query_param("skip_corrupted"); - - auto f = make_ready_future<>(); - if (!req_param(*req, "disable_snapshot", false)) { - auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count()); - f = parallel_for_each(column_families, [keyspace, tag](sstring cf) { - return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag); - }); - } - - return f.then([&ctx, keyspace, column_families] { - return ctx.db.invoke_on_all([=] (database& db) { - return do_for_each(column_families, [=, &db](sstring cfname) { - auto& cm = db.get_compaction_manager(); - auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_scrub(&cf); - }); - }); - }).then([]{ - return make_ready_future(0); - }); - })); - - ss::upgrade_sstables.set(r, wrap_ks_cf([&ctx](std::unique_ptr req, sstring keyspace, std::vector column_families) { + ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { bool exclude_current_version = req_param(*req, "exclude_current_version", false); return ctx.db.invoke_on_all([=] (database& db) { @@ -1037,4 +954,89 @@ void set_storage_service(http_context& ctx, routes& r) { } +void set_snapshot(http_context& ctx, routes& r) { + ss::get_snapshot_details.set(r, [](std::unique_ptr req) { + return service::get_local_storage_service().get_snapshot_details().then([] (auto result) { + std::vector res; + for (auto& map: result) { + ss::snapshots all_snapshots; + all_snapshots.key = map.first; + + std::vector snapshot; + for (auto& cf: map.second) { + ss::snapshot s; + s.ks = cf.ks; + s.cf = cf.cf; + s.live = cf.live; + s.total = cf.total; + snapshot.push_back(std::move(s)); + } + all_snapshots.value = std::move(snapshot); + res.push_back(std::move(all_snapshots)); + } + return make_ready_future(std::move(res)); + }); + }); + + ss::take_snapshot.set(r, [](std::unique_ptr req) { + auto tag = req->get_query_param("tag"); + auto column_family = req->get_query_param("cf"); + + std::vector keynames = split(req->get_query_param("kn"), ","); + + auto resp = make_ready_future<>(); + if (column_family.empty()) { + resp = service::get_local_storage_service().take_snapshot(tag, keynames); + } else { + if (keynames.size() > 1) { + throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family"); + } + resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag); + } + return resp.then([] { + return make_ready_future(json_void()); + }); + }); + + ss::del_snapshot.set(r, [](std::unique_ptr req) { + auto tag = req->get_query_param("tag"); + + std::vector keynames = split(req->get_query_param("kn"), ","); + return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] { + return make_ready_future(json_void()); + }); + }); + + ss::true_snapshots_size.set(r, [](std::unique_ptr req) { + return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) { + return make_ready_future(size); + }); + }); + + ss::scrub.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { + // TODO: respect this + auto skip_corrupted = req->get_query_param("skip_corrupted"); + + auto f = make_ready_future<>(); + if (!req_param(*req, "disable_snapshot", false)) { + auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count()); + f = parallel_for_each(column_families, [keyspace, tag](sstring cf) { + return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag); + }); + } + + return f.then([&ctx, keyspace, column_families] { + return ctx.db.invoke_on_all([=] (database& db) { + return do_for_each(column_families, [=, &db](sstring cfname) { + auto& cm = db.get_compaction_manager(); + auto& cf = db.find_column_family(keyspace, cfname); + return cm.perform_sstable_scrub(&cf); + }); + }); + }).then([]{ + return make_ready_future(0); + }); + })); +} + } diff --git a/api/storage_service.hh b/api/storage_service.hh index 301b60fa7d..e97480fd33 100644 --- a/api/storage_service.hh +++ b/api/storage_service.hh @@ -26,5 +26,6 @@ namespace api { void set_storage_service(http_context& ctx, routes& r); +void set_snapshot(http_context& ctx, routes& r); } diff --git a/main.cc b/main.cc index e289a224f9..4007c04213 100644 --- a/main.cc +++ b/main.cc @@ -950,6 +950,11 @@ int main(int ac, char** av) { api::set_server_messaging_service(ctx).get(); api::set_server_storage_service(ctx).get(); ss.init_server_without_the_messaging_service_part().get(); + api::set_server_snapshot(ctx).get(); + auto stop_snapshots = defer_verbose_shutdown("snapshots", [] { + service::get_storage_service().invoke_on_all(&service::storage_service::snapshots_close).get(); + }); + supervisor::notify("starting batchlog manager"); db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { return b.start(); diff --git a/service/storage_service.cc b/service/storage_service.cc index 735302ca9d..7f8fbd1702 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1998,15 +1998,21 @@ future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) { template std::result_of_t storage_service::run_snapshot_modify_operation(Func&& f) { - return smp::submit_to(0, [f = std::move(f)] () mutable { - return with_lock(get_local_storage_service()._snapshot_lock.for_write(), std::move(f)); + auto& ss = get_storage_service(); + return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () { + return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable { + return with_lock(ss._snapshot_lock.for_write(), std::move(f)); + }); }); } template std::result_of_t storage_service::run_snapshot_list_operation(Func&& f) { - return smp::submit_to(0, [f = std::move(f)] () mutable { - return with_lock(get_local_storage_service()._snapshot_lock.for_read(), std::move(f)); + auto& ss = get_storage_service(); + return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () { + return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable { + return with_lock(ss._snapshot_lock.for_read(), std::move(f)); + }); }); } @@ -2020,11 +2026,6 @@ future<> storage_service::take_snapshot(sstring tag, std::vector keyspa }; return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), this] { - auto mode = get_local_storage_service()._operation_mode; - if (mode == storage_service::mode::JOINING) { - throw std::runtime_error("Cannot snapshot until bootstrap completes"); - } - return parallel_for_each(keyspace_names, [tag, this] (auto& ks_name) { return check_snapshot_not_exist(_db.local(), ks_name, tag); }).then([this, tag, keyspace_names] { @@ -2058,10 +2059,6 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c } return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] { - auto mode = get_local_storage_service()._operation_mode; - if (mode == storage_service::mode::JOINING) { - throw std::runtime_error("Cannot snapshot until bootstrap completes"); - } return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] { return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) { auto& cf = db.find_column_family(ks_name, cf_name); diff --git a/service/storage_service.hh b/service/storage_service.hh index d32ae6ae0e..4503c41db1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -52,6 +52,7 @@ #include "gms/application_state.hh" #include "db/system_distributed_keyspace.hh" #include +#include #include "utils/fb_utilities.hh" #include "utils/serialized_action.hh" #include "database_fwd.hh" @@ -265,6 +266,7 @@ private: bool _joined = false; seastar::rwlock _snapshot_lock; + seastar::gate _snapshot_ops; template static std::result_of_t run_snapshot_modify_operation(Func&&); @@ -273,6 +275,11 @@ private: static std::result_of_t run_snapshot_list_operation(Func&&); public: enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }; + + future<> snapshots_close() { + return _snapshot_ops.close(); + } + private: mode _operation_mode = mode::STARTING; friend std::ostream& operator<<(std::ostream& os, const mode& mode);