diff --git a/api/storage_service.cc b/api/storage_service.cc index 253e4b1096..07f27e30e6 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -763,13 +763,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { return ss.local().get_operation_mode().then([] (auto mode) { - return make_ready_future(mode); + return make_ready_future(format("{}", mode)); }); }); ss::is_starting.set(r, [&ss](std::unique_ptr req) { - return ss.local().is_starting().then([] (auto starting) { - return make_ready_future(starting); + return ss.local().get_operation_mode().then([] (auto mode) { + return make_ready_future(mode <= service::storage_service::mode::STARTING); }); }); @@ -839,9 +839,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded(json_void()); }); - ss::is_initialized.set(r, [&ss](std::unique_ptr req) { - return ss.local().is_initialized().then([] (bool initialized) { - return make_ready_future(initialized); + ss::is_initialized.set(r, [&ss, &g](std::unique_ptr req) { + return ss.local().get_operation_mode().then([&g] (auto mode) { + bool is_initialized = mode >= service::storage_service::mode::STARTING; + if (mode == service::storage_service::mode::NORMAL) { + is_initialized = g.is_enabled(); + } + return make_ready_future(is_initialized); }); }); @@ -850,7 +854,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded req) { - return make_ready_future(ss.local().is_joined()); + return ss.local().get_operation_mode().then([] (auto mode) { + return make_ready_future(mode >= service::storage_service::mode::JOINING); + }); }); ss::set_stream_throughput_mb_per_sec.set(r, [](std::unique_ptr req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 115ae230a1..28fd57e875 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -138,7 +138,9 @@ enum class node_external_status { static node_external_status map_operation_mode(storage_service::mode m) { switch (m) { + case storage_service::mode::NONE: return node_external_status::STARTING; case storage_service::mode::STARTING: return node_external_status::STARTING; + case storage_service::mode::BOOTSTRAP: return node_external_status::JOINING; case storage_service::mode::JOINING: return node_external_status::JOINING; case storage_service::mode::NORMAL: return node_external_status::NORMAL; case storage_service::mode::LEAVING: return node_external_status::LEAVING; @@ -413,11 +415,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela // Runs inside seastar::async context void storage_service::join_token_ring(std::chrono::milliseconds delay) { - // This function only gets called on shard 0, but we want to set _joined - // on all shards, so this variable can be later read locally. - container().invoke_on_all([] (auto&& ss) { - ss._joined = true; - }).get(); + set_mode(mode::JOINING); _group0->join_group0().get(); @@ -437,7 +435,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) { } else { db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS).get(); } - set_mode(mode::JOINING, "waiting for ring information", true); + slogger.info("waiting for ring information"); // if our schema hasn't matched yet, keep sleeping until it does // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) @@ -449,7 +447,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) { if (tmptr->is_member(get_broadcast_address())) { throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)"); } - set_mode(mode::JOINING, "getting bootstrap token", true); + slogger.info("getting bootstrap token"); if (resume_bootstrap) { _bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0(); if (!_bootstrap_tokens.empty()) { @@ -483,7 +481,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) { } else { sleep_abortable(get_ring_delay(), _abort_source).get(); } - set_mode(mode::JOINING, format("Replacing a node with token(s): {}", _bootstrap_tokens), true); + slogger.info("Replacing a node with token(s): {}", _bootstrap_tokens); // _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node } maybe_start_sys_dist_ks(); @@ -565,7 +563,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) { // start participating in the ring. set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id).get(); - set_mode(mode::NORMAL, "node is now in normal status", true); + set_mode(mode::NORMAL); if (get_token_metadata().sorted_tokens().empty()) { auto err = format("join_token_ring: Sorted token in token_metadata is empty"); @@ -610,10 +608,9 @@ std::list storage_service::get_ignore_dead_nodes_for_replace( // Runs inside seastar::async context void storage_service::bootstrap() { - _is_bootstrap_mode = true; - auto x = seastar::defer([this] { _is_bootstrap_mode = false; }); auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap); + set_mode(mode::BOOTSTRAP); slogger.debug("bootstrap: rbno={} replacing={}", bootstrap_rbno, is_replacing()); if (!is_replacing()) { // Wait until we know tokens of existing node before announcing join status. @@ -667,7 +664,7 @@ void storage_service::bootstrap() { { gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) }, }).get(); - set_mode(mode::JOINING, format("sleeping {} ms for pending range setup", get_ring_delay().count()), true); + slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count()); _gossiper.wait_for_range_setup().get(); } else { // Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created. @@ -677,7 +674,7 @@ void storage_service::bootstrap() { } } else { // Wait until we know tokens of existing node before announcing replacing status. - set_mode(mode::JOINING, fmt::format("Wait until local node knows tokens of peer nodes"), true); + slogger.info("Wait until local node knows tokens of peer nodes"); _gossiper.wait_for_range_setup().get(); auto replace_addr = get_replace_address(); slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr); @@ -691,7 +688,7 @@ void storage_service::bootstrap() { } }).get(); - set_mode(mode::JOINING, "Starting to bootstrap...", true); + slogger.info("Starting to bootstrap..."); if (is_replacing()) { run_replace_ops(); } else { @@ -1308,7 +1305,7 @@ future<> storage_service::init_server(cql3::query_processor& qp) { assert(this_shard_id() == 0); return seastar::async([this, &qp] { - _initialized = true; + set_mode(mode::STARTING); _group0 = std::make_unique(_abort_source, _raft_gr, _messaging.local(), _gossiper, qp, _migration_manager.local()); @@ -1659,9 +1656,11 @@ future> storage_service::effective_ownership( } static const std::map mode_names = { + {storage_service::mode::NONE, "STARTING"}, {storage_service::mode::STARTING, "STARTING"}, {storage_service::mode::NORMAL, "NORMAL"}, {storage_service::mode::JOINING, "JOINING"}, + {storage_service::mode::BOOTSTRAP, "BOOTSTRAP"}, {storage_service::mode::LEAVING, "LEAVING"}, {storage_service::mode::DECOMMISSIONED, "DECOMMISSIONED"}, {storage_service::mode::MOVING, "MOVING"}, @@ -1674,16 +1673,15 @@ std::ostream& operator<<(std::ostream& os, const storage_service::mode& m) { return os; } -void storage_service::set_mode(mode m, bool log) { - set_mode(m, "", log); -} - -void storage_service::set_mode(mode m, sstring msg, bool log) { - _operation_mode = m; - if (log) { - slogger.info("{}: {}", m, msg); +void storage_service::set_mode(mode m) { + if (m != _operation_mode) { + slogger.info("entering {} mode", m); + _operation_mode = m; } else { - slogger.debug("{}: {}", m, msg); + // This shouldn't happen, but it's too much for an assert, + // so -- just emit a warning in the hope that it will be + // noticed, reported and fixed + slogger.warn("re-entering {} mode", m); } } @@ -1737,17 +1735,9 @@ future>> storage_service::descr }); }; -future storage_service::get_operation_mode() { +future storage_service::get_operation_mode() { return run_with_no_api_lock([] (storage_service& ss) { - auto mode = ss._operation_mode; - return make_ready_future(format("{}", mode)); - }); -} - -future storage_service::is_starting() { - return run_with_no_api_lock([] (storage_service& ss) { - auto mode = ss._operation_mode; - return mode == storage_service::mode::STARTING; + return make_ready_future(ss._operation_mode); }); } @@ -1760,7 +1750,7 @@ future storage_service::is_gossip_running() { future<> storage_service::start_gossiping() { return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) { return seastar::async([&ss] { - if (!ss._initialized) { + if (!ss._gossiper.is_enabled()) { slogger.warn("Starting gossip by operator request"); ss._gossiper.container().invoke_on_all(&gms::gossiper::start).get(); auto undo = defer([&ss] { ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).get(); }); @@ -1772,9 +1762,7 @@ future<> storage_service::start_gossiping() { db::system_keyspace::get_local_tokens().get0(), cdc_gen_ts).get(); ss._gossiper.force_newer_generation(); - ss._gossiper.start_gossiping(utils::get_generation_number()).then([&ss] { - ss._initialized = true; - }).get(); + ss._gossiper.start_gossiping(utils::get_generation_number()).get(); undo.cancel(); } }); @@ -1783,21 +1771,15 @@ future<> storage_service::start_gossiping() { future<> storage_service::stop_gossiping() { return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) { - if (ss._initialized) { + if (ss._gossiper.is_enabled()) { slogger.warn("Stopping gossip by operator request"); - return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).then([&ss] { - ss._initialized = false; - }); + return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop); } return make_ready_future<>(); }); } future<> storage_service::do_stop_ms() { - if (_ms_stopped) { - return make_ready_future<>(); - } - _ms_stopped = true; return _messaging.invoke_on_all([] (auto& ms) { return ms.shutdown(); }).then([] { @@ -1966,7 +1948,7 @@ future<> storage_service::decommission() { // StageManager.shutdownNow(); db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get(); slogger.info("DECOMMISSIONING: set_bootstrap_state done"); - ss.set_mode(mode::DECOMMISSIONED, true); + ss.set_mode(mode::DECOMMISSIONED); slogger.info("DECOMMISSIONING: done"); // let op be responsible for killing the process }); @@ -2590,10 +2572,10 @@ future<> storage_service::drain() { return make_ready_future<>(); } - ss.set_mode(mode::DRAINING, "starting drain process", true); + ss.set_mode(mode::DRAINING); return ss.do_drain().then([&ss] { ss._drain_finished.set_value(); - ss.set_mode(mode::DRAINED, true); + ss.set_mode(mode::DRAINED); }); }); } @@ -2607,10 +2589,10 @@ future<> storage_service::do_drain() { return bm.drain(); }); - set_mode(mode::DRAINING, "shutting down migration manager", false); + slogger.debug("shutting down migration manager"); co_await _migration_manager.invoke_on_all(&service::migration_manager::drain); - set_mode(mode::DRAINING, "flushing column families", false); + slogger.debug("flushing column families"); co_await _db.invoke_on_all(&replica::database::drain); } @@ -2653,12 +2635,6 @@ int32_t storage_service::get_exception_count() { return 0; } -future storage_service::is_initialized() { - return run_with_no_api_lock([] (storage_service& ss) { - return ss._initialized; - }); -} - // Runs inside seastar::async context std::unordered_multimap storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) { // First get all ranges the leaving endpoint is responsible for @@ -2750,7 +2726,7 @@ void storage_service::unbootstrap() { ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); } - set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); + set_mode(mode::LEAVING); auto stream_success = stream_ranges(ranges_to_stream); // Wait for batch log to complete before streaming hints. @@ -2758,7 +2734,7 @@ void storage_service::unbootstrap() { // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. get_batchlog_manager().local().do_batch_log_replay().get(); - set_mode(mode::LEAVING, "streaming hints to other nodes", true); + slogger.info("streaming hints to other nodes"); // wait for the transfer runnables to signal the latch. slogger.debug("waiting for stream acks."); @@ -3541,7 +3517,7 @@ future storage_service::is_cleanup_allowed(sstring keyspace) { return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) { auto my_address = ss.get_broadcast_address(); auto pending_ranges = ss.get_token_metadata().has_pending_ranges(keyspace, my_address); - bool is_bootstrap_mode = ss._is_bootstrap_mode; + bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP; slogger.debug("is_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}", keyspace, is_bootstrap_mode, pending_ranges); return !is_bootstrap_mode && !pending_ranges; diff --git a/service/storage_service.hh b/service/storage_service.hh index c422fff12c..093d5b4f3b 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -152,7 +152,6 @@ private: sharded& _repair; sharded& _stream_manager; sstring _operation_in_progress; - bool _ms_stopped = false; seastar::metrics::metric_groups _metrics; using client_shutdown_hook = noncopyable_function; std::vector _protocol_servers; @@ -274,17 +273,10 @@ private: std::optional _removing_node; - /* Are we starting this node in bootstrap mode? */ - bool _is_bootstrap_mode = false; - - bool _initialized = false; - - bool _joined = false; - public: - enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }; + enum class mode { NONE, STARTING, JOINING, BOOTSTRAP, NORMAL, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }; private: - mode _operation_mode = mode::STARTING; + mode _operation_mode = mode::NONE; friend std::ostream& operator<<(std::ostream& os, const mode& mode); /* Used for tracking drain progress */ @@ -342,7 +334,6 @@ private: future<> wait_for_ring_to_settle(std::chrono::milliseconds delay); public: - future is_initialized(); future<> check_for_endpoint_collision(std::unordered_set initial_contact_nodes, const std::unordered_map& loaded_peer_features); @@ -398,17 +389,11 @@ private: void join_token_ring(std::chrono::milliseconds); void maybe_start_sys_dist_ks(); public: - inline bool is_joined() const { - // Every time we set _joined, we do it on all shards, so we can read its - // value locally. - return _joined; - } future<> rebuild(sstring source_dc); private: - void set_mode(mode m, bool log); - void set_mode(mode m, sstring msg, bool log); + void set_mode(mode m); void mark_existing_views_as_built(); // Stream data for which we become a new replica. @@ -417,10 +402,6 @@ private: void bootstrap(); public: - bool is_bootstrap_mode() const { - return _is_bootstrap_mode; - } - /** * Return the rpc address associated with an endpoint as a string. * @param endpoint The endpoint to get rpc address for @@ -761,9 +742,7 @@ public: void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req); future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list nodes, lw_shared_ptr heartbeat_updater_done); - future get_operation_mode(); - - future is_starting(); + future get_operation_mode(); /** * Shuts node off to writes, empties memtables and the commit log.