Merge "Some refactoring of node startup code" from Kamil

"
The node startup code (in particular the functions storage_service::prepare_to_join and storage_service::join_token_ring) is complicated and hard to understand.

This patch set aims to simplify it at least a bit by removing some dead code, moving code around so it's easier to understand and adding some comments that explain what the code does.
I did it to help me prepare for implementing generation and gossiping of CDC streams.
"

* 'bootstrap-refactors' of https://github.com/kbr-/scylla:
  storage_service: more comments in join_token_ring
  db: remove system_keyspace::update_local_tokens
  db: improve documentation for update_tokens and get_saved_tokens in system_keyspace
  storage_service: remove storage_service::_is_bootstrap_mode.
  storage_service: simplify storage_service::bootstrap method
  storage_service: fix typo in handle_state_moving
  storage_service: remove unnecessary use of stringstream
  storage_service: remove redundant call to update_tokens during join_token_ring
  storage_service: remove storage_service::set_tokens method.
  storage_service: remove is_survey_mode
  storage_service::handle_state_normal: tokens_to_update* -> owned_tokens
  storage_service::handle_state_normal: remove local_tokens_to_remove
  db::system_keyspace::update_tokens: take tokens by const ref
  db::system_keyspace::prepare_tokens: make static, take tokens by const ref
  token_metadata::update_normal_tokens: take tokens by const ref
This commit is contained in:
Avi Kivity
2019-10-22 12:11:11 +03:00
7 changed files with 93 additions and 162 deletions

View File

@@ -1454,7 +1454,7 @@ future<db_clock::time_point> get_truncated_at(utils::UUID cf_id) {
});
}
set_type_impl::native_type prepare_tokens(std::unordered_set<dht::token>& tokens) {
static set_type_impl::native_type prepare_tokens(const std::unordered_set<dht::token>& tokens) {
set_type_impl::native_type tset;
for (auto& t: tokens) {
tset.push_back(dht::global_partitioner().to_sstring(t));
@@ -1472,10 +1472,7 @@ std::unordered_set<dht::token> decode_tokens(set_type_impl::native_type& tokens)
return tset;
}
/**
* Record tokens being used by another node
*/
future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> tokens)
future<> update_tokens(gms::inet_address ep, const std::unordered_set<dht::token>& tokens)
{
if (ep == utils::fb_utilities::get_broadcast_address()) {
return remove_endpoint(ep);
@@ -1488,21 +1485,6 @@ future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> toke
});
}
future<std::unordered_set<dht::token>> update_local_tokens(
const std::unordered_set<dht::token> add_tokens,
const std::unordered_set<dht::token> rm_tokens) {
return get_saved_tokens().then([add_tokens = std::move(add_tokens), rm_tokens = std::move(rm_tokens)] (auto tokens) {
for (auto& x : rm_tokens) {
tokens.erase(x);
}
for (auto& x : add_tokens) {
tokens.insert(x);
}
return update_tokens(tokens).then([tokens] {
return tokens;
});
});
}
future<std::unordered_map<gms::inet_address, std::unordered_set<dht::token>>> load_tokens() {
sstring req = format("SELECT peer, tokens FROM system.{}", PEERS);
@@ -1648,10 +1630,7 @@ future<> remove_endpoint(gms::inet_address ep) {
});
}
/**
* This method is used to update the System Keyspace with the new tokens for this node
*/
future<> update_tokens(std::unordered_set<dht::token> tokens) {
future<> update_tokens(const std::unordered_set<dht::token>& tokens) {
if (tokens.empty()) {
throw std::invalid_argument("remove_endpoint should be used instead");
}

View File

@@ -162,8 +162,16 @@ future<> setup(distributed<database>& db,
distributed<cql3::query_processor>& qp,
distributed<service::storage_service>& ss);
future<> update_schema_version(utils::UUID version);
future<> update_tokens(std::unordered_set<dht::token> tokens);
future<> update_tokens(gms::inet_address ep, std::unordered_set<dht::token> tokens);
/*
* Save tokens used by this node in the LOCAL table.
*/
future<> update_tokens(const std::unordered_set<dht::token>& tokens);
/**
* Record tokens being used by another node in the PEERS table.
*/
future<> update_tokens(gms::inet_address ep, const std::unordered_set<dht::token>& tokens);
future<> update_preferred_ip(gms::inet_address ep, gms::inet_address preferred_ip);
future<std::unordered_map<gms::inet_address, gms::inet_address>> get_preferred_ips();
@@ -477,17 +485,6 @@ enum class bootstrap_state {
}
#endif
/**
* Convenience method to update the list of tokens in the local system keyspace.
*
* @param addTokens tokens to add
* @param rmTokens tokens to remove
* @return the collection of persisted tokens
*/
future<std::unordered_set<dht::token>> update_local_tokens(
const std::unordered_set<dht::token> add_tokens,
const std::unordered_set<dht::token> rm_tokens);
/**
* Return a map of stored tokens to IP addresses
*
@@ -500,6 +497,10 @@ enum class bootstrap_state {
*/
future<std::unordered_map<gms::inet_address, utils::UUID>> load_host_ids();
/*
* Read this node's tokens stored in the LOCAL table.
* Used to initialize a restarting node.
*/
future<std::unordered_set<dht::token>> get_saved_tokens();
future<std::unordered_map<gms::inet_address, sstring>> load_peer_features();

View File

@@ -62,9 +62,7 @@ future<> boot_strapper::bootstrap() {
return streamer->add_ranges(keyspace_name, ranges);
}).then([this, streamer] {
_abort_source.check();
return streamer->stream_async().then([streamer] () {
service::get_local_storage_service().finish_bootstrapping();
}).handle_exception([streamer] (std::exception_ptr eptr) {
return streamer->stream_async().handle_exception([streamer] (std::exception_ptr eptr) {
blogger.warn("Error during bootstrap: {}", eptr);
return make_exception_future<>(std::move(eptr));
});

View File

@@ -102,7 +102,7 @@ void token_metadata::update_normal_tokens(std::unordered_set<token> tokens, inet
*
* @param endpointTokens
*/
void token_metadata::update_normal_tokens(std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
void token_metadata::update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens) {
if (endpoint_tokens.empty()) {
return;
}
@@ -110,7 +110,7 @@ void token_metadata::update_normal_tokens(std::unordered_map<inet_address, std::
bool should_sort_tokens = false;
for (auto&& i : endpoint_tokens) {
inet_address endpoint = i.first;
std::unordered_set<token>& tokens = i.second;
const auto& tokens = i.second;
if (tokens.empty()) {
auto msg = format("tokens is empty in update_normal_tokens");

View File

@@ -234,7 +234,7 @@ public:
const std::vector<token>& sorted_tokens() const;
void update_normal_token(token token, inet_address endpoint);
void update_normal_tokens(std::unordered_set<token> tokens, inet_address endpoint);
void update_normal_tokens(std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
void update_normal_tokens(const std::unordered_map<inet_address, std::unordered_set<token>>& endpoint_tokens);
const token& first_token(const token& start) const;
size_t first_token_index(const token& start) const;
std::optional<inet_address> get_endpoint(const token& token) const;

View File

@@ -599,11 +599,9 @@ static auth::service_config auth_service_config_from_db_config(const db::config&
}
void storage_service::maybe_start_sys_dist_ks() {
if (!_is_survey_mode) {
supervisor::notify("starting system distributed keyspace");
_sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get();
_sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get();
}
supervisor::notify("starting system distributed keyspace");
_sys_dist_ks.start(std::ref(cql3::get_query_processor()), std::ref(service::get_migration_manager())).get();
_sys_dist_ks.invoke_on_all(&db::system_distributed_keyspace::start).get();
}
// Runs inside seastar::async context
@@ -709,19 +707,14 @@ void storage_service::join_token_ring(int delay) {
} else {
sleep_abortable(get_ring_delay(), _abort_source).get();
}
std::stringstream ss;
ss << _bootstrap_tokens;
set_mode(mode::JOINING, format("Replacing a node with token(s): {}", ss.str()), true);
set_mode(mode::JOINING, format("Replacing a node with token(s): {}", _bootstrap_tokens), true);
// _bootstrap_tokens was previously set in prepare_to_join using tokens gossiped by the replaced node
}
maybe_start_sys_dist_ks();
mark_existing_views_as_built();
bootstrap(_bootstrap_tokens);
db::system_keyspace::update_tokens(_bootstrap_tokens).get();
bootstrap();
// bootstrap will block until finished
if (_is_bootstrap_mode) {
auto err = format("We are not supposed in bootstrap mode any more");
slogger.warn("{}", err);
throw std::runtime_error(err);
}
} else {
maybe_start_sys_dist_ks();
size_t num_tokens = _db.local().get_config().num_tokens();
@@ -742,6 +735,7 @@ void storage_service::join_token_ring(int delay) {
}
slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens);
}
db::system_keyspace::update_tokens(_bootstrap_tokens).get();
} else {
if (_bootstrap_tokens.size() != num_tokens) {
throw std::runtime_error(format("Cannot change the number of tokens from {:d} to {:d}", _bootstrap_tokens.size(), num_tokens));
@@ -756,35 +750,42 @@ void storage_service::join_token_ring(int delay) {
MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false);
#endif
if (!_is_survey_mode) {
// start participating in the ring.
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
set_tokens(_bootstrap_tokens);
// remove the existing info about the replaced node.
if (!current.empty()) {
for (auto existing : current) {
_gossiper.replaced_endpoint(existing);
}
// At this point our local tokens are chosen (_bootstrap_tokens) and will not be changed unless we bootstrap again.
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
slogger.debug("Setting tokens to {}", _bootstrap_tokens);
// This node must know about its chosen tokens before other nodes do
// since they may start sending writes to this node after it gossips status = NORMAL.
// Therefore, in case we haven't updated _token_metadata with our tokens yet, do it now.
_token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address());
replicate_to_all_cores().get();
// start participating in the ring.
set_gossip_tokens(_bootstrap_tokens);
set_mode(mode::NORMAL, "node is now in normal status", true);
// remove the existing info about the replaced node.
if (!current.empty()) {
for (auto existing : current) {
_gossiper.replaced_endpoint(existing);
}
if (_token_metadata.sorted_tokens().empty()) {
auto err = format("join_token_ring: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
_auth_service.start(
permissions_cache_config_from_db_config(_db.local().get_config()),
std::ref(cql3::get_query_processor()),
std::ref(service::get_migration_manager()),
auth_service_config_from_db_config(_db.local().get_config())).get();
_auth_service.invoke_on_all(&auth::service::start).get();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing().get();
} else {
slogger.info("Startup complete, but write survey mode is active, not becoming an active ring member. Use JMX (StorageService->joinRing()) to finalize ring joining.");
}
if (_token_metadata.sorted_tokens().empty()) {
auto err = format("join_token_ring: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
_auth_service.start(
permissions_cache_config_from_db_config(_db.local().get_config()),
std::ref(cql3::get_query_processor()),
std::ref(service::get_migration_manager()),
auth_service_config_from_db_config(_db.local().get_config())).get();
_auth_service.invoke_on_all(&auth::service::start).get();
supervisor::notify("starting tracing");
tracing::tracing::start_tracing().get();
}
future<> storage_service::join_ring() {
@@ -793,25 +794,6 @@ future<> storage_service::join_ring() {
if (!ss._joined) {
slogger.info("Joining ring by operator request");
ss.join_token_ring(0);
} else if (ss._is_survey_mode) {
auto tokens = db::system_keyspace::get_saved_tokens().get0();
ss.set_tokens(std::move(tokens));
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::COMPLETED).get();
ss._is_survey_mode = false;
slogger.info("Leaving write survey mode and joining ring at operator request");
if (ss._token_metadata.sorted_tokens().empty()) {
auto err = format("join_ring: Sorted token in token_metadata is empty");
slogger.error("{}", err);
throw std::runtime_error(err);
}
ss._auth_service.start(
permissions_cache_config_from_db_config(ss._db.local().get_config()),
std::ref(cql3::get_query_processor()),
std::ref(service::get_migration_manager()),
auth_service_config_from_db_config(ss._db.local().get_config())).get();
ss._auth_service.invoke_on_all(&auth::service::start).get();
}
});
});
@@ -820,7 +802,7 @@ future<> storage_service::join_ring() {
bool storage_service::is_joined() {
// Every time we set _joined, we do it on all shards, so we can read its
// value locally.
return _joined && !_is_survey_mode;
return _joined;
}
void storage_service::mark_existing_views_as_built() {
@@ -836,23 +818,20 @@ void storage_service::mark_existing_views_as_built() {
}
// Runs inside seastar::async context
void storage_service::bootstrap(std::unordered_set<token> tokens) {
_is_bootstrap_mode = true;
// DON'T use set_token, that makes us part of the ring locally which is incorrect until we are done bootstrapping
db::system_keyspace::update_tokens(tokens).get();
void storage_service::bootstrap() {
if (!db().local().is_replacing()) {
// Wait until we know tokens of existing node before announcing join status.
_gossiper.wait_for_range_setup().get();
// if not an existing token then bootstrap
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, value_factory.tokens(tokens) },
{ gms::application_state::STATUS, value_factory.bootstrapping(tokens) },
{ gms::application_state::TOKENS, value_factory.tokens(_bootstrap_tokens) },
{ gms::application_state::STATUS, value_factory.bootstrapping(_bootstrap_tokens) },
}).get();
set_mode(mode::JOINING, format("sleeping {} ms for pending range setup", get_ring_delay().count()), true);
_gossiper.wait_for_range_setup().get();
} else {
// Dont set any state for the node which is bootstrapping the existing token...
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
_token_metadata.update_normal_tokens(_bootstrap_tokens, get_broadcast_address());
replicate_to_all_cores().get();
auto replace_addr = db().local().get_replace_address();
if (replace_addr) {
@@ -864,9 +843,10 @@ void storage_service::bootstrap(std::unordered_set<token> tokens) {
_gossiper.check_seen_seeds();
set_mode(mode::JOINING, "Starting to bootstrap...", true);
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), tokens, _token_metadata);
bs.bootstrap().get(); // handles token update
slogger.info("Bootstrap completed! for the tokens {}", tokens);
dht::boot_strapper bs(_db, _abort_source, get_broadcast_address(), _bootstrap_tokens, _token_metadata);
// Does the actual streaming of newly replicated token ranges.
bs.bootstrap().get();
slogger.info("Bootstrap completed! for the tokens {}", _bootstrap_tokens);
}
sstring
@@ -967,11 +947,6 @@ void storage_service::handle_state_normal(inet_address endpoint) {
slogger.debug("endpoint={} handle_state_normal", endpoint);
auto tokens = get_tokens_for(endpoint);
std::unordered_set<token> tokens_to_update_in_metadata;
std::unordered_set<token> tokens_to_update_in_system_keyspace;
std::unordered_set<token> local_tokens_to_remove;
std::unordered_set<inet_address> endpoints_to_remove;
slogger.debug("Node {} state normal, token {}", endpoint, tokens);
if (_token_metadata.is_member(endpoint)) {
@@ -979,6 +954,8 @@ void storage_service::handle_state_normal(inet_address endpoint) {
}
update_peer_info(endpoint);
std::unordered_set<inet_address> endpoints_to_remove;
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (_gossiper.uses_host_id(endpoint)) {
auto host_id = _gossiper.get_host_id(endpoint);
@@ -1010,22 +987,24 @@ void storage_service::handle_state_normal(inet_address endpoint) {
}
}
// Tokens owned by the handled endpoint.
// The endpoint broadcasts its set of chosen tokens. If a token was also chosen by another endpoint,
// the collision is resolved by assigning the token to the endpoint which started later.
std::unordered_set<token> owned_tokens;
for (auto t : tokens) {
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
auto current_owner = _token_metadata.get_endpoint(t);
if (!current_owner) {
slogger.debug("handle_state_normal: New node {} at token {}", endpoint, t);
tokens_to_update_in_metadata.insert(t);
tokens_to_update_in_system_keyspace.insert(t);
owned_tokens.insert(t);
} else if (endpoint == *current_owner) {
slogger.debug("handle_state_normal: endpoint={} == current_owner={} token {}", endpoint, *current_owner, t);
// set state back to normal, since the node may have tried to leave, but failed and is now back up
tokens_to_update_in_metadata.insert(t);
tokens_to_update_in_system_keyspace.insert(t);
owned_tokens.insert(t);
} else if (_gossiper.compare_endpoint_startup(endpoint, *current_owner) > 0) {
slogger.debug("handle_state_normal: endpoint={} > current_owner={}, token {}", endpoint, *current_owner, t);
tokens_to_update_in_metadata.insert(t);
tokens_to_update_in_system_keyspace.insert(t);
owned_tokens.insert(t);
// currentOwner is no longer current, endpoint is. Keep track of these moves, because when
// a host no longer has any tokens, we'll want to remove it.
std::multimap<inet_address, token> ep_to_token_copy = get_token_metadata().get_endpoint_to_token_map_for_reading();
@@ -1050,7 +1029,7 @@ void storage_service::handle_state_normal(inet_address endpoint) {
// Update pending ranges after update of normal tokens immediately to avoid
// a race where natural endpoint was updated to contain node A, but A was
// not yet removed from pending endpoints
_token_metadata.update_normal_tokens(tokens_to_update_in_metadata, endpoint);
_token_metadata.update_normal_tokens(owned_tokens, endpoint);
_update_pending_ranges_action.trigger_later().get();
for (auto ep : endpoints_to_remove) {
@@ -1060,9 +1039,9 @@ void storage_service::handle_state_normal(inet_address endpoint) {
_gossiper.replacement_quarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
}
}
slogger.debug("handle_state_normal: endpoint={} tokens_to_update_in_system_keyspace = {}", endpoint, tokens_to_update_in_system_keyspace);
if (!tokens_to_update_in_system_keyspace.empty()) {
db::system_keyspace::update_tokens(endpoint, tokens_to_update_in_system_keyspace).then_wrapped([endpoint] (auto&& f) {
slogger.debug("handle_state_normal: endpoint={} owned_tokens = {}", endpoint, owned_tokens);
if (!owned_tokens.empty()) {
db::system_keyspace::update_tokens(endpoint, owned_tokens).then_wrapped([endpoint] (auto&& f) {
try {
f.get();
} catch (...) {
@@ -1071,9 +1050,6 @@ void storage_service::handle_state_normal(inet_address endpoint) {
return make_ready_future<>();
}).get();
}
if (!local_tokens_to_remove.empty()) {
db::system_keyspace::update_local_tokens(std::unordered_set<dht::token>(), local_tokens_to_remove).discard_result().get();
}
// Send joined notification only when this node was not a member prior to this
if (!is_member) {
@@ -1137,7 +1113,7 @@ void storage_service::handle_state_left(inet_address endpoint, std::vector<sstri
}
void storage_service::handle_state_moving(inet_address endpoint, std::vector<sstring> pieces) {
throw std::runtime_error(format("Move opeartion is not supported only more, endpoint={}", endpoint));
throw std::runtime_error(format("Move operation is not supported anymore, endpoint={}", endpoint));
}
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
@@ -1390,16 +1366,6 @@ std::unordered_set<locator::token> storage_service::get_tokens_for(inet_address
}
// Runs inside seastar::async context
void storage_service::set_tokens(std::unordered_set<token> tokens) {
slogger.debug("Setting tokens to {}", tokens);
db::system_keyspace::update_tokens(tokens).get();
auto local_tokens = get_local_tokens().get0();
_token_metadata.update_normal_tokens(tokens, get_broadcast_address());
replicate_to_all_cores().get();
set_gossip_tokens(local_tokens);
set_mode(mode::NORMAL, "node is now in normal status", true);
}
void storage_service::set_gossip_tokens(const std::unordered_set<dht::token>& local_tokens) {
_gossiper.add_local_application_state({
{ gms::application_state::TOKENS, value_factory.tokens(local_tokens) },

View File

@@ -259,13 +259,6 @@ private:
std::optional<inet_address> _removing_node;
/* Are we starting this node in bootstrap mode? */
bool _is_bootstrap_mode;
/* we bootstrap but do NOT join the ring unless told to do so */
// FIXME: System.getProperty("cassandra.write_survey", "false")
bool _is_survey_mode = false;
bool _initialized;
bool _joined = false;
@@ -351,12 +344,6 @@ public:
sstables::sstable_version_types sstables_format() const { return _sstables_format; }
void enable_all_features();
void finish_bootstrapping() {
_is_bootstrap_mode = false;
}
/** This method updates the local token on disk */
void set_tokens(std::unordered_set<token> tokens);
void set_gossip_tokens(const std::unordered_set<dht::token>& local_tokens);
#if 0
@@ -553,13 +540,13 @@ private:
void set_mode(mode m, bool log);
void set_mode(mode m, sstring msg, bool log);
void mark_existing_views_as_built();
// Stream data for which we become a new replica.
// Before that, if we're not replacing another node, inform other nodes about our chosen tokens (_bootstrap_tokens)
// and wait for RING_DELAY ms so that we receive new writes from coordinators during streaming.
void bootstrap();
public:
void bootstrap(std::unordered_set<token> tokens);
bool is_bootstrap_mode() {
return _is_bootstrap_mode;
}
#if 0
public TokenMetadata getTokenMetadata()