From 4886c1db747b8f67936f8c0d42155737d1d57612 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 19 Dec 2019 14:52:07 +0300 Subject: [PATCH 1/3] api: Unwrap wrap_ks_cf This is preparation for the next patch -- the lambda in question (and the used type) will be needed in two functions, so make the lambda a "real" function. Signed-off-by: Pavel Emelyanov --- api/storage_service.cc | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index 5f68a94665..77b4b67521 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -78,20 +78,20 @@ static std::vector describe_ring(const sstring& keyspace) { return res; } -void set_storage_service(http_context& ctx, routes& r) { - using ks_cf_func = std::function(std::unique_ptr, sstring, std::vector)>; +using ks_cf_func = std::function(http_context&, std::unique_ptr, sstring, std::vector)>; - auto wrap_ks_cf = [&ctx](ks_cf_func f) { - return [&ctx, f = std::move(f)](std::unique_ptr req) { - auto keyspace = validate_keyspace(ctx, req->param); - auto column_families = split_cf(req->get_query_param("cf")); - if (column_families.empty()) { - column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); - } - return f(std::move(req), std::move(keyspace), std::move(column_families)); - }; +static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { + return [&ctx, f = std::move(f)](std::unique_ptr req) { + auto keyspace = validate_keyspace(ctx, req->param); + auto column_families = split_cf(req->get_query_param("cf")); + if (column_families.empty()) { + column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); + } + return f(ctx, std::move(req), std::move(keyspace), std::move(column_families)); }; +} +void set_storage_service(http_context& ctx, routes& r) { ss::local_hostid.set(r, [](std::unique_ptr req) { return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) { return make_ready_future(id.to_sstring()); @@ -326,7 +326,7 @@ void set_storage_service(http_context& ctx, routes& r) { }); }); - ss::scrub.set(r, wrap_ks_cf([&ctx](std::unique_ptr req, sstring keyspace, std::vector column_families) { + ss::scrub.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { // TODO: respect this auto skip_corrupted = req->get_query_param("skip_corrupted"); @@ -351,7 +351,7 @@ void set_storage_service(http_context& ctx, routes& r) { }); })); - ss::upgrade_sstables.set(r, wrap_ks_cf([&ctx](std::unique_ptr req, sstring keyspace, std::vector column_families) { + ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { bool exclude_current_version = req_param(*req, "exclude_current_version", false); return ctx.db.invoke_on_all([=] (database& db) { From fd6b5efe75760bb1454a631e2a4f898ebb55c27e Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Thu, 19 Dec 2019 14:58:05 +0300 Subject: [PATCH 2/3] api: Register snapshot API later In storage_service's snapshot code there are checks for _operation_mode being _not_ JOINING to proceed. The intention is apparently to allow for snapshots only after the cluster join. However, here's how the start-up code looks like - _operation_mode = STARTING in storage_service::constructor - snapshot API registered in api::set_server_storage_service - _operation_mode = JOINING in storage_service::join_token_ring So in between steps 2 and 3 snapshots can be taken. Although there's a quick and simple fix for that (check for the _operation_mode to be not STARTING either) I think it's better to register the snapshot API later instead. This will help greatly to de-bload the storage_service, in particular -- to incapsulate the _operation_mode properly. Note, though the check for _operation_mode is made only for taking snapshot, I move all snapshot ops registration to the later phase. Signed-off-by: Pavel Emelyanov --- api/api.cc | 4 + api/api_init.hh | 1 + api/storage_service.cc | 168 +++++++++++++++++++------------------ api/storage_service.hh | 1 + main.cc | 2 + service/storage_service.cc | 9 -- 6 files changed, 93 insertions(+), 92 deletions(-) diff --git a/api/api.cc b/api/api.cc index 07df2b420a..98dd7b67aa 100644 --- a/api/api.cc +++ b/api/api.cc @@ -90,6 +90,10 @@ future<> set_server_storage_service(http_context& ctx) { return register_api(ctx, "storage_service", "The storage service API", set_storage_service); } +future<> set_server_snapshot(http_context& ctx) { + return ctx.http_server.set_routes([&ctx] (routes& r) { set_snapshot(ctx, r); }); +} + future<> set_server_snitch(http_context& ctx) { return register_api(ctx, "endpoint_snitch_info", "The endpoint snitch info API", set_endpoint_snitch); } diff --git a/api/api_init.hh b/api/api_init.hh index 09a9d360ba..ca837b9c72 100644 --- a/api/api_init.hh +++ b/api/api_init.hh @@ -44,6 +44,7 @@ struct http_context { future<> set_server_init(http_context& ctx); future<> set_server_snitch(http_context& ctx); future<> set_server_storage_service(http_context& ctx); +future<> set_server_snapshot(http_context& ctx); future<> set_server_gossip(http_context& ctx); future<> set_server_load_sstable(http_context& ctx); future<> set_server_messaging_service(http_context& ctx); diff --git a/api/storage_service.cc b/api/storage_service.cc index 77b4b67521..78fdf80223 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -222,64 +222,6 @@ void set_storage_service(http_context& ctx, routes& r) { req.get_query_param("key"))); }); - ss::get_snapshot_details.set(r, [](std::unique_ptr req) { - return service::get_local_storage_service().get_snapshot_details().then([] (auto result) { - std::vector res; - for (auto& map: result) { - ss::snapshots all_snapshots; - all_snapshots.key = map.first; - - std::vector snapshot; - for (auto& cf: map.second) { - ss::snapshot s; - s.ks = cf.ks; - s.cf = cf.cf; - s.live = cf.live; - s.total = cf.total; - snapshot.push_back(std::move(s)); - } - all_snapshots.value = std::move(snapshot); - res.push_back(std::move(all_snapshots)); - } - return make_ready_future(std::move(res)); - }); - }); - - ss::take_snapshot.set(r, [](std::unique_ptr req) { - auto tag = req->get_query_param("tag"); - auto column_family = req->get_query_param("cf"); - - std::vector keynames = split(req->get_query_param("kn"), ","); - - auto resp = make_ready_future<>(); - if (column_family.empty()) { - resp = service::get_local_storage_service().take_snapshot(tag, keynames); - } else { - if (keynames.size() > 1) { - throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family"); - } - resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag); - } - return resp.then([] { - return make_ready_future(json_void()); - }); - }); - - ss::del_snapshot.set(r, [](std::unique_ptr req) { - auto tag = req->get_query_param("tag"); - - std::vector keynames = split(req->get_query_param("kn"), ","); - return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] { - return make_ready_future(json_void()); - }); - }); - - ss::true_snapshots_size.set(r, [](std::unique_ptr req) { - return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) { - return make_ready_future(size); - }); - }); - ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr req) { auto keyspace = validate_keyspace(ctx, req->param); auto column_families = split_cf(req->get_query_param("cf")); @@ -326,31 +268,6 @@ void set_storage_service(http_context& ctx, routes& r) { }); }); - ss::scrub.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { - // TODO: respect this - auto skip_corrupted = req->get_query_param("skip_corrupted"); - - auto f = make_ready_future<>(); - if (!req_param(*req, "disable_snapshot", false)) { - auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count()); - f = parallel_for_each(column_families, [keyspace, tag](sstring cf) { - return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag); - }); - } - - return f.then([&ctx, keyspace, column_families] { - return ctx.db.invoke_on_all([=] (database& db) { - return do_for_each(column_families, [=, &db](sstring cfname) { - auto& cm = db.get_compaction_manager(); - auto& cf = db.find_column_family(keyspace, cfname); - return cm.perform_sstable_scrub(&cf); - }); - }); - }).then([]{ - return make_ready_future(0); - }); - })); - ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { bool exclude_current_version = req_param(*req, "exclude_current_version", false); @@ -1037,4 +954,89 @@ void set_storage_service(http_context& ctx, routes& r) { } +void set_snapshot(http_context& ctx, routes& r) { + ss::get_snapshot_details.set(r, [](std::unique_ptr req) { + return service::get_local_storage_service().get_snapshot_details().then([] (auto result) { + std::vector res; + for (auto& map: result) { + ss::snapshots all_snapshots; + all_snapshots.key = map.first; + + std::vector snapshot; + for (auto& cf: map.second) { + ss::snapshot s; + s.ks = cf.ks; + s.cf = cf.cf; + s.live = cf.live; + s.total = cf.total; + snapshot.push_back(std::move(s)); + } + all_snapshots.value = std::move(snapshot); + res.push_back(std::move(all_snapshots)); + } + return make_ready_future(std::move(res)); + }); + }); + + ss::take_snapshot.set(r, [](std::unique_ptr req) { + auto tag = req->get_query_param("tag"); + auto column_family = req->get_query_param("cf"); + + std::vector keynames = split(req->get_query_param("kn"), ","); + + auto resp = make_ready_future<>(); + if (column_family.empty()) { + resp = service::get_local_storage_service().take_snapshot(tag, keynames); + } else { + if (keynames.size() > 1) { + throw httpd::bad_param_exception("Only one keyspace allowed when specifying a column family"); + } + resp = service::get_local_storage_service().take_column_family_snapshot(keynames[0], column_family, tag); + } + return resp.then([] { + return make_ready_future(json_void()); + }); + }); + + ss::del_snapshot.set(r, [](std::unique_ptr req) { + auto tag = req->get_query_param("tag"); + + std::vector keynames = split(req->get_query_param("kn"), ","); + return service::get_local_storage_service().clear_snapshot(tag, keynames).then([] { + return make_ready_future(json_void()); + }); + }); + + ss::true_snapshots_size.set(r, [](std::unique_ptr req) { + return service::get_local_storage_service().true_snapshots_size().then([] (int64_t size) { + return make_ready_future(size); + }); + }); + + ss::scrub.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector column_families) { + // TODO: respect this + auto skip_corrupted = req->get_query_param("skip_corrupted"); + + auto f = make_ready_future<>(); + if (!req_param(*req, "disable_snapshot", false)) { + auto tag = format("pre-scrub-{:d}", db_clock::now().time_since_epoch().count()); + f = parallel_for_each(column_families, [keyspace, tag](sstring cf) { + return service::get_local_storage_service().take_column_family_snapshot(keyspace, cf, tag); + }); + } + + return f.then([&ctx, keyspace, column_families] { + return ctx.db.invoke_on_all([=] (database& db) { + return do_for_each(column_families, [=, &db](sstring cfname) { + auto& cm = db.get_compaction_manager(); + auto& cf = db.find_column_family(keyspace, cfname); + return cm.perform_sstable_scrub(&cf); + }); + }); + }).then([]{ + return make_ready_future(0); + }); + })); +} + } diff --git a/api/storage_service.hh b/api/storage_service.hh index 301b60fa7d..e97480fd33 100644 --- a/api/storage_service.hh +++ b/api/storage_service.hh @@ -26,5 +26,6 @@ namespace api { void set_storage_service(http_context& ctx, routes& r); +void set_snapshot(http_context& ctx, routes& r); } diff --git a/main.cc b/main.cc index bab004efa7..ba6f3c5d49 100644 --- a/main.cc +++ b/main.cc @@ -947,6 +947,8 @@ int main(int ac, char** av) { api::set_server_messaging_service(ctx).get(); api::set_server_storage_service(ctx).get(); ss.init_server_without_the_messaging_service_part().get(); + api::set_server_snapshot(ctx).get(); + supervisor::notify("starting batchlog manager"); db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { return b.start(); diff --git a/service/storage_service.cc b/service/storage_service.cc index a94864a7ec..b60aa4d712 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -2019,11 +2019,6 @@ future<> storage_service::take_snapshot(sstring tag, std::vector keyspa }; return run_snapshot_modify_operation([tag = std::move(tag), keyspace_names = std::move(keyspace_names), this] { - auto mode = get_local_storage_service()._operation_mode; - if (mode == storage_service::mode::JOINING) { - throw std::runtime_error("Cannot snapshot until bootstrap completes"); - } - return parallel_for_each(keyspace_names, [tag, this] (auto& ks_name) { return check_snapshot_not_exist(_db.local(), ks_name, tag); }).then([this, tag, keyspace_names] { @@ -2057,10 +2052,6 @@ future<> storage_service::take_column_family_snapshot(sstring ks_name, sstring c } return run_snapshot_modify_operation([this, ks_name = std::move(ks_name), cf_name = std::move(cf_name), tag = std::move(tag)] { - auto mode = get_local_storage_service()._operation_mode; - if (mode == storage_service::mode::JOINING) { - throw std::runtime_error("Cannot snapshot until bootstrap completes"); - } return check_snapshot_not_exist(_db.local(), ks_name, tag).then([this, ks_name, cf_name, tag] { return _db.invoke_on_all([ks_name, cf_name, tag] (database &db) { auto& cf = db.find_column_family(ks_name, cf_name); From 976463f62067301afe59c831abd8e62a38722127 Mon Sep 17 00:00:00 2001 From: Pavel Emelyanov Date: Fri, 20 Dec 2019 13:57:14 +0300 Subject: [PATCH 3/3] snapshot: Pass requests through gate When the scylla process is stopped no code waits for current snapshot operations to finish. Also, the API server is not stopped either, so new snapshot requests can creep into. In seastar there's a useful abstraction to address both. Signed-off-by: Pavel Emelyanov --- main.cc | 3 +++ service/storage_service.cc | 14 ++++++++++---- service/storage_service.hh | 7 +++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/main.cc b/main.cc index ba6f3c5d49..dd8bed3995 100644 --- a/main.cc +++ b/main.cc @@ -948,6 +948,9 @@ int main(int ac, char** av) { api::set_server_storage_service(ctx).get(); ss.init_server_without_the_messaging_service_part().get(); api::set_server_snapshot(ctx).get(); + auto stop_snapshots = defer_verbose_shutdown("snapshots", [] { + service::get_storage_service().invoke_on_all(&service::storage_service::snapshots_close).get(); + }); supervisor::notify("starting batchlog manager"); db::get_batchlog_manager().invoke_on_all([] (db::batchlog_manager& b) { diff --git a/service/storage_service.cc b/service/storage_service.cc index b60aa4d712..a30a7c397d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1997,15 +1997,21 @@ future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) { template std::result_of_t storage_service::run_snapshot_modify_operation(Func&& f) { - return smp::submit_to(0, [f = std::move(f)] () mutable { - return with_lock(get_local_storage_service()._snapshot_lock.for_write(), std::move(f)); + auto& ss = get_storage_service(); + return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () { + return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable { + return with_lock(ss._snapshot_lock.for_write(), std::move(f)); + }); }); } template std::result_of_t storage_service::run_snapshot_list_operation(Func&& f) { - return smp::submit_to(0, [f = std::move(f)] () mutable { - return with_lock(get_local_storage_service()._snapshot_lock.for_read(), std::move(f)); + auto& ss = get_storage_service(); + return with_gate(ss.local()._snapshot_ops, [f = std::move(f), &ss] () { + return ss.invoke_on(0, [f = std::move(f)] (auto& ss) mutable { + return with_lock(ss._snapshot_lock.for_read(), std::move(f)); + }); }); } diff --git a/service/storage_service.hh b/service/storage_service.hh index d32ae6ae0e..4503c41db1 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -52,6 +52,7 @@ #include "gms/application_state.hh" #include "db/system_distributed_keyspace.hh" #include +#include #include "utils/fb_utilities.hh" #include "utils/serialized_action.hh" #include "database_fwd.hh" @@ -265,6 +266,7 @@ private: bool _joined = false; seastar::rwlock _snapshot_lock; + seastar::gate _snapshot_ops; template static std::result_of_t run_snapshot_modify_operation(Func&&); @@ -273,6 +275,11 @@ private: static std::result_of_t run_snapshot_list_operation(Func&&); public: enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }; + + future<> snapshots_close() { + return _snapshot_ops.close(); + } + private: mode _operation_mode = mode::STARTING; friend std::ostream& operator<<(std::ostream& os, const mode& mode);