Merge "Remove sub-mode booleans from storage service" from Pavel Emelyanov

"
There's a _operation_mode enum sitting on storage_service that indicates the
top-level state of the scylla node. Next to it there's a bunch of booleans
that define (and duplicate) some sub-modes. These booleans just make the code
more obscure and complicated. This set removes all those booleans and patches
all the relevant checks/calls/methods to rely only on the operation mode.
Also, the switching between modes is simplified down to some bare minimum.

tests: unit(dev) dtest.simple_boot_shutdown(dev) manual(dev)

Manual test included start-stop, nodetool enablegossip, disablegosip and drain
commands, scylla-cly is_initialized and is_joined calls

As noticed in v2, this set changes the log messages that are checked by
dtests. The fix for dtest, that's compatible with both -- current scylla and
this patchset -- is already in dtest master.
"

* 'br-remove-bools-from-storage-service-3-rebase' of https://github.com/xemul/scylla:
  storage_service: Relax operation modes switch
  storage_service: Remove _ms_stopped
  storage_service: Remove _is_bootstrap_mode
  storage_service: Remove _initialized and is_initialized()
  storage_service: Remove _joined and is_joined()
  storage_service: Replace is_starting() with get_operation_mode()
  storage_service: Make get_operation_mode() return mode itself
  storage_service: Relax repeating set_mode-s
This commit is contained in:
Botond Dénes
2022-03-07 15:27:03 +02:00
3 changed files with 53 additions and 92 deletions

View File

@@ -763,13 +763,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
ss::get_operation_mode.set(r, [&ss](std::unique_ptr<request> req) {
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode);
return make_ready_future<json::json_return_type>(format("{}", mode));
});
});
ss::is_starting.set(r, [&ss](std::unique_ptr<request> req) {
return ss.local().is_starting().then([] (auto starting) {
return make_ready_future<json::json_return_type>(starting);
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode <= service::storage_service::mode::STARTING);
});
});
@@ -839,9 +839,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
ss::is_initialized.set(r, [&ss](std::unique_ptr<request> req) {
return ss.local().is_initialized().then([] (bool initialized) {
return make_ready_future<json::json_return_type>(initialized);
ss::is_initialized.set(r, [&ss, &g](std::unique_ptr<request> req) {
return ss.local().get_operation_mode().then([&g] (auto mode) {
bool is_initialized = mode >= service::storage_service::mode::STARTING;
if (mode == service::storage_service::mode::NORMAL) {
is_initialized = g.is_enabled();
}
return make_ready_future<json::json_return_type>(is_initialized);
});
});
@@ -850,7 +854,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
ss::is_joined.set(r, [&ss] (std::unique_ptr<request> req) {
return make_ready_future<json::json_return_type>(ss.local().is_joined());
return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode >= service::storage_service::mode::JOINING);
});
});
ss::set_stream_throughput_mb_per_sec.set(r, [](std::unique_ptr<request> req) {

View File

@@ -138,7 +138,9 @@ enum class node_external_status {
static node_external_status map_operation_mode(storage_service::mode m) {
switch (m) {
case storage_service::mode::NONE: return node_external_status::STARTING;
case storage_service::mode::STARTING: return node_external_status::STARTING;
case storage_service::mode::BOOTSTRAP: return node_external_status::JOINING;
case storage_service::mode::JOINING: return node_external_status::JOINING;
case storage_service::mode::NORMAL: return node_external_status::NORMAL;
case storage_service::mode::LEAVING: return node_external_status::LEAVING;
@@ -413,11 +415,7 @@ future<> storage_service::wait_for_ring_to_settle(std::chrono::milliseconds dela
// Runs inside seastar::async context
void storage_service::join_token_ring(std::chrono::milliseconds delay) {
// This function only gets called on shard 0, but we want to set _joined
// on all shards, so this variable can be later read locally.
container().invoke_on_all([] (auto&& ss) {
ss._joined = true;
}).get();
set_mode(mode::JOINING);
_group0->join_group0().get();
@@ -437,7 +435,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
} else {
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::IN_PROGRESS).get();
}
set_mode(mode::JOINING, "waiting for ring information", true);
slogger.info("waiting for ring information");
// if our schema hasn't matched yet, keep sleeping until it does
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
@@ -449,7 +447,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
if (tmptr->is_member(get_broadcast_address())) {
throw std::runtime_error("This node is already a member of the token ring; bootstrap aborted. (If replacing a dead node, remove the old one from the ring first.)");
}
set_mode(mode::JOINING, "getting bootstrap token", true);
slogger.info("getting bootstrap token");
if (resume_bootstrap) {
_bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0();
if (!_bootstrap_tokens.empty()) {
@@ -483,7 +481,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
} else {
sleep_abortable(get_ring_delay(), _abort_source).get();
}
set_mode(mode::JOINING, format("Replacing a node with token(s): {}", _bootstrap_tokens), true);
slogger.info("Replacing a node with token(s): {}", _bootstrap_tokens);
// _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node
}
maybe_start_sys_dist_ks();
@@ -565,7 +563,7 @@ void storage_service::join_token_ring(std::chrono::milliseconds delay) {
// start participating in the ring.
set_gossip_tokens(_gossiper, _bootstrap_tokens, _cdc_gen_id).get();
set_mode(mode::NORMAL, "node is now in normal status", true);
set_mode(mode::NORMAL);
if (get_token_metadata().sorted_tokens().empty()) {
auto err = format("join_token_ring: Sorted token in token_metadata is empty");
@@ -610,10 +608,9 @@ std::list<gms::inet_address> storage_service::get_ignore_dead_nodes_for_replace(
// Runs inside seastar::async context
void storage_service::bootstrap() {
_is_bootstrap_mode = true;
auto x = seastar::defer([this] { _is_bootstrap_mode = false; });
auto bootstrap_rbno = is_repair_based_node_ops_enabled(streaming::stream_reason::bootstrap);
set_mode(mode::BOOTSTRAP);
slogger.debug("bootstrap: rbno={} replacing={}", bootstrap_rbno, is_replacing());
if (!is_replacing()) {
// Wait until we know tokens of existing node before announcing join status.
@@ -667,7 +664,7 @@ void storage_service::bootstrap() {
{ gms::application_state::STATUS, versioned_value::bootstrapping(_bootstrap_tokens) },
}).get();
set_mode(mode::JOINING, format("sleeping {} ms for pending range setup", get_ring_delay().count()), true);
slogger.info("sleeping {} ms for pending range setup", get_ring_delay().count());
_gossiper.wait_for_range_setup().get();
} else {
// Even with RBNO bootstrap we need to announce the new CDC generation immediately after it's created.
@@ -677,7 +674,7 @@ void storage_service::bootstrap() {
}
} else {
// Wait until we know tokens of existing node before announcing replacing status.
set_mode(mode::JOINING, fmt::format("Wait until local node knows tokens of peer nodes"), true);
slogger.info("Wait until local node knows tokens of peer nodes");
_gossiper.wait_for_range_setup().get();
auto replace_addr = get_replace_address();
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
@@ -691,7 +688,7 @@ void storage_service::bootstrap() {
}
}).get();
set_mode(mode::JOINING, "Starting to bootstrap...", true);
slogger.info("Starting to bootstrap...");
if (is_replacing()) {
run_replace_ops();
} else {
@@ -1308,7 +1305,7 @@ future<> storage_service::init_server(cql3::query_processor& qp) {
assert(this_shard_id() == 0);
return seastar::async([this, &qp] {
_initialized = true;
set_mode(mode::STARTING);
_group0 = std::make_unique<raft_group0>(_abort_source, _raft_gr, _messaging.local(),
_gossiper, qp, _migration_manager.local());
@@ -1659,9 +1656,11 @@ future<std::map<gms::inet_address, float>> storage_service::effective_ownership(
}
static const std::map<storage_service::mode, sstring> mode_names = {
{storage_service::mode::NONE, "STARTING"},
{storage_service::mode::STARTING, "STARTING"},
{storage_service::mode::NORMAL, "NORMAL"},
{storage_service::mode::JOINING, "JOINING"},
{storage_service::mode::BOOTSTRAP, "BOOTSTRAP"},
{storage_service::mode::LEAVING, "LEAVING"},
{storage_service::mode::DECOMMISSIONED, "DECOMMISSIONED"},
{storage_service::mode::MOVING, "MOVING"},
@@ -1674,16 +1673,15 @@ std::ostream& operator<<(std::ostream& os, const storage_service::mode& m) {
return os;
}
void storage_service::set_mode(mode m, bool log) {
set_mode(m, "", log);
}
void storage_service::set_mode(mode m, sstring msg, bool log) {
_operation_mode = m;
if (log) {
slogger.info("{}: {}", m, msg);
void storage_service::set_mode(mode m) {
if (m != _operation_mode) {
slogger.info("entering {} mode", m);
_operation_mode = m;
} else {
slogger.debug("{}: {}", m, msg);
// This shouldn't happen, but it's too much for an assert,
// so -- just emit a warning in the hope that it will be
// noticed, reported and fixed
slogger.warn("re-entering {} mode", m);
}
}
@@ -1737,17 +1735,9 @@ future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::descr
});
};
future<sstring> storage_service::get_operation_mode() {
future<storage_service::mode> storage_service::get_operation_mode() {
return run_with_no_api_lock([] (storage_service& ss) {
auto mode = ss._operation_mode;
return make_ready_future<sstring>(format("{}", mode));
});
}
future<bool> storage_service::is_starting() {
return run_with_no_api_lock([] (storage_service& ss) {
auto mode = ss._operation_mode;
return mode == storage_service::mode::STARTING;
return make_ready_future<mode>(ss._operation_mode);
});
}
@@ -1760,7 +1750,7 @@ future<bool> storage_service::is_gossip_running() {
future<> storage_service::start_gossiping() {
return run_with_api_lock(sstring("start_gossiping"), [] (storage_service& ss) {
return seastar::async([&ss] {
if (!ss._initialized) {
if (!ss._gossiper.is_enabled()) {
slogger.warn("Starting gossip by operator request");
ss._gossiper.container().invoke_on_all(&gms::gossiper::start).get();
auto undo = defer([&ss] { ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).get(); });
@@ -1772,9 +1762,7 @@ future<> storage_service::start_gossiping() {
db::system_keyspace::get_local_tokens().get0(),
cdc_gen_ts).get();
ss._gossiper.force_newer_generation();
ss._gossiper.start_gossiping(utils::get_generation_number()).then([&ss] {
ss._initialized = true;
}).get();
ss._gossiper.start_gossiping(utils::get_generation_number()).get();
undo.cancel();
}
});
@@ -1783,21 +1771,15 @@ future<> storage_service::start_gossiping() {
future<> storage_service::stop_gossiping() {
return run_with_api_lock(sstring("stop_gossiping"), [] (storage_service& ss) {
if (ss._initialized) {
if (ss._gossiper.is_enabled()) {
slogger.warn("Stopping gossip by operator request");
return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop).then([&ss] {
ss._initialized = false;
});
return ss._gossiper.container().invoke_on_all(&gms::gossiper::stop);
}
return make_ready_future<>();
});
}
future<> storage_service::do_stop_ms() {
if (_ms_stopped) {
return make_ready_future<>();
}
_ms_stopped = true;
return _messaging.invoke_on_all([] (auto& ms) {
return ms.shutdown();
}).then([] {
@@ -1966,7 +1948,7 @@ future<> storage_service::decommission() {
// StageManager.shutdownNow();
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::DECOMMISSIONED).get();
slogger.info("DECOMMISSIONING: set_bootstrap_state done");
ss.set_mode(mode::DECOMMISSIONED, true);
ss.set_mode(mode::DECOMMISSIONED);
slogger.info("DECOMMISSIONING: done");
// let op be responsible for killing the process
});
@@ -2590,10 +2572,10 @@ future<> storage_service::drain() {
return make_ready_future<>();
}
ss.set_mode(mode::DRAINING, "starting drain process", true);
ss.set_mode(mode::DRAINING);
return ss.do_drain().then([&ss] {
ss._drain_finished.set_value();
ss.set_mode(mode::DRAINED, true);
ss.set_mode(mode::DRAINED);
});
});
}
@@ -2607,10 +2589,10 @@ future<> storage_service::do_drain() {
return bm.drain();
});
set_mode(mode::DRAINING, "shutting down migration manager", false);
slogger.debug("shutting down migration manager");
co_await _migration_manager.invoke_on_all(&service::migration_manager::drain);
set_mode(mode::DRAINING, "flushing column families", false);
slogger.debug("flushing column families");
co_await _db.invoke_on_all(&replica::database::drain);
}
@@ -2653,12 +2635,6 @@ int32_t storage_service::get_exception_count() {
return 0;
}
future<bool> storage_service::is_initialized() {
return run_with_no_api_lock([] (storage_service& ss) {
return ss._initialized;
});
}
// Runs inside seastar::async context
std::unordered_multimap<dht::token_range, inet_address> storage_service::get_changed_ranges_for_leaving(sstring keyspace_name, inet_address endpoint) {
// First get all ranges the leaving endpoint is responsible for
@@ -2750,7 +2726,7 @@ void storage_service::unbootstrap() {
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
}
set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true);
set_mode(mode::LEAVING);
auto stream_success = stream_ranges(ranges_to_stream);
// Wait for batch log to complete before streaming hints.
@@ -2758,7 +2734,7 @@ void storage_service::unbootstrap() {
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
get_batchlog_manager().local().do_batch_log_replay().get();
set_mode(mode::LEAVING, "streaming hints to other nodes", true);
slogger.info("streaming hints to other nodes");
// wait for the transfer runnables to signal the latch.
slogger.debug("waiting for stream acks.");
@@ -3541,7 +3517,7 @@ future<bool> storage_service::is_cleanup_allowed(sstring keyspace) {
return container().invoke_on(0, [keyspace = std::move(keyspace)] (storage_service& ss) {
auto my_address = ss.get_broadcast_address();
auto pending_ranges = ss.get_token_metadata().has_pending_ranges(keyspace, my_address);
bool is_bootstrap_mode = ss._is_bootstrap_mode;
bool is_bootstrap_mode = ss._operation_mode == mode::BOOTSTRAP;
slogger.debug("is_cleanup_allowed: keyspace={}, is_bootstrap_mode={}, pending_ranges={}",
keyspace, is_bootstrap_mode, pending_ranges);
return !is_bootstrap_mode && !pending_ranges;

View File

@@ -152,7 +152,6 @@ private:
sharded<repair_service>& _repair;
sharded<streaming::stream_manager>& _stream_manager;
sstring _operation_in_progress;
bool _ms_stopped = false;
seastar::metrics::metric_groups _metrics;
using client_shutdown_hook = noncopyable_function<void()>;
std::vector<protocol_server*> _protocol_servers;
@@ -274,17 +273,10 @@ private:
std::optional<inet_address> _removing_node;
/* Are we starting this node in bootstrap mode? */
bool _is_bootstrap_mode = false;
bool _initialized = false;
bool _joined = false;
public:
enum class mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED };
enum class mode { NONE, STARTING, JOINING, BOOTSTRAP, NORMAL, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED };
private:
mode _operation_mode = mode::STARTING;
mode _operation_mode = mode::NONE;
friend std::ostream& operator<<(std::ostream& os, const mode& mode);
/* Used for tracking drain progress */
@@ -342,7 +334,6 @@ private:
future<> wait_for_ring_to_settle(std::chrono::milliseconds delay);
public:
future<bool> is_initialized();
future<> check_for_endpoint_collision(std::unordered_set<gms::inet_address> initial_contact_nodes,
const std::unordered_map<gms::inet_address, sstring>& loaded_peer_features);
@@ -398,17 +389,11 @@ private:
void join_token_ring(std::chrono::milliseconds);
void maybe_start_sys_dist_ks();
public:
inline bool is_joined() const {
// Every time we set _joined, we do it on all shards, so we can read its
// value locally.
return _joined;
}
future<> rebuild(sstring source_dc);
private:
void set_mode(mode m, bool log);
void set_mode(mode m, sstring msg, bool log);
void set_mode(mode m);
void mark_existing_views_as_built();
// Stream data for which we become a new replica.
@@ -417,10 +402,6 @@ private:
void bootstrap();
public:
bool is_bootstrap_mode() const {
return _is_bootstrap_mode;
}
/**
* Return the rpc address associated with an endpoint as a string.
* @param endpoint The endpoint to get rpc address for
@@ -761,9 +742,7 @@ public:
void node_ops_cmd_check(gms::inet_address coordinator, const node_ops_cmd_request& req);
future<> node_ops_cmd_heartbeat_updater(const node_ops_cmd& cmd, utils::UUID uuid, std::list<gms::inet_address> nodes, lw_shared_ptr<bool> heartbeat_updater_done);
future<sstring> get_operation_mode();
future<bool> is_starting();
future<mode> get_operation_mode();
/**
* Shuts node off to writes, empties memtables and the commit log.