Merge "Brush up the initial tokens generation code" from Pavel Emelyanov
On start the storage_service sets up initial tokens. Some dangling variables, checks and code duplication had accumulated over time. * xemul/br-storage-service-bootstrap-leftovers: dht: Use db::config to generate initial tookens database, dht: Move get_initial_tokens() storage_service: Factor out random/config tokens generation storage_service: No extra get_replace_address checks storage_service: Remove write-only local variable
This commit is contained in:
@@ -8,6 +8,10 @@
|
||||
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
|
||||
*/
|
||||
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/erase.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
|
||||
#include "dht/boot_strapper.hh"
|
||||
@@ -58,24 +62,32 @@ future<> boot_strapper::bootstrap(streaming::stream_reason reason, gms::gossiper
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metadata_ptr tmptr, replica::database& db) {
|
||||
auto initial_tokens = db.get_initial_tokens();
|
||||
std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, dht::check_token_endpoint check) {
|
||||
std::unordered_set<sstring> initial_tokens;
|
||||
sstring tokens_string = cfg.initial_token();
|
||||
try {
|
||||
boost::split(initial_tokens, tokens_string, boost::is_any_of(sstring(", ")));
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Unable to parse initial_token={}", tokens_string));
|
||||
}
|
||||
initial_tokens.erase("");
|
||||
|
||||
// if user specified tokens, use those
|
||||
if (initial_tokens.size() > 0) {
|
||||
blogger.debug("tokens manually specified as {}", initial_tokens);
|
||||
std::unordered_set<token> tokens;
|
||||
for (auto& token_string : initial_tokens) {
|
||||
auto token = dht::token::from_sstring(token_string);
|
||||
if (tmptr->get_endpoint(token)) {
|
||||
if (check && tmptr->get_endpoint(token)) {
|
||||
throw std::runtime_error(format("Bootstrapping to existing token {} is not allowed (decommission/removenode the old node first).", token_string));
|
||||
}
|
||||
tokens.insert(token);
|
||||
}
|
||||
blogger.debug("Get manually specified bootstrap_tokens={}", tokens);
|
||||
blogger.info("Get manually specified bootstrap_tokens={}", tokens);
|
||||
return tokens;
|
||||
}
|
||||
|
||||
size_t num_tokens = db.get_config().num_tokens();
|
||||
size_t num_tokens = cfg.num_tokens();
|
||||
if (num_tokens < 1) {
|
||||
throw std::runtime_error("num_tokens must be >= 1");
|
||||
}
|
||||
@@ -85,7 +97,7 @@ std::unordered_set<token> boot_strapper::get_bootstrap_tokens(const token_metada
|
||||
}
|
||||
|
||||
auto tokens = get_random_tokens(std::move(tmptr), num_tokens);
|
||||
blogger.debug("Get random bootstrap_tokens={}", tokens);
|
||||
blogger.info("Get random bootstrap_tokens={}", tokens);
|
||||
return tokens;
|
||||
}
|
||||
|
||||
|
||||
@@ -19,9 +19,12 @@
|
||||
|
||||
namespace streaming { class stream_manager; }
|
||||
namespace gms { class gossiper; }
|
||||
namespace db { class config; }
|
||||
|
||||
namespace dht {
|
||||
|
||||
using check_token_endpoint = bool_class<struct check_token_endpoint_tag>;
|
||||
|
||||
class boot_strapper {
|
||||
using inet_address = gms::inet_address;
|
||||
using token_metadata = locator::token_metadata;
|
||||
@@ -52,7 +55,7 @@ public:
|
||||
* otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
|
||||
* else choose num_tokens tokens at random
|
||||
*/
|
||||
static std::unordered_set<token> get_bootstrap_tokens(const token_metadata_ptr tmptr, replica::database& db);
|
||||
static std::unordered_set<token> get_bootstrap_tokens(const token_metadata_ptr tmptr, const db::config& cfg, check_token_endpoint check);
|
||||
|
||||
static std::unordered_set<token> get_random_tokens(const token_metadata_ptr tmptr, size_t num_tokens);
|
||||
#if 0
|
||||
|
||||
@@ -20,9 +20,7 @@
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/reactor.hh>
|
||||
#include <seastar/core/metrics.hh>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/erase.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "compaction/compaction.hh"
|
||||
@@ -1400,18 +1398,6 @@ database::query_mutations(schema_ptr s, const query::read_command& cmd, const dh
|
||||
co_return std::tuple(std::move(result), hit_rate);
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> database::get_initial_tokens() {
|
||||
std::unordered_set<sstring> tokens;
|
||||
sstring tokens_string = get_config().initial_token();
|
||||
try {
|
||||
boost::split(tokens, tokens_string, boost::is_any_of(sstring(", ")));
|
||||
} catch (...) {
|
||||
throw std::runtime_error(format("Unable to parse initial_token={}", tokens_string));
|
||||
}
|
||||
tokens.erase("");
|
||||
return tokens;
|
||||
}
|
||||
|
||||
std::optional<gms::inet_address> database::get_replace_address() {
|
||||
auto& cfg = get_config();
|
||||
sstring replace_address = cfg.replace_address();
|
||||
|
||||
@@ -1573,7 +1573,6 @@ public:
|
||||
return _dirty_memory_manager.region_group();
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> get_initial_tokens();
|
||||
std::optional<gms::inet_address> get_replace_address();
|
||||
bool is_replacing();
|
||||
|
||||
|
||||
@@ -221,8 +221,8 @@ void storage_service::prepare_to_join(
|
||||
}
|
||||
_bootstrap_tokens = prepare_replacement_info(initial_contact_nodes, loaded_peer_features).get0();
|
||||
auto replace_address = _db.local().get_replace_address();
|
||||
replacing_a_node_with_same_ip = replace_address && *replace_address == get_broadcast_address();
|
||||
replacing_a_node_with_diff_ip = replace_address && *replace_address != get_broadcast_address();
|
||||
replacing_a_node_with_same_ip = *replace_address == get_broadcast_address();
|
||||
replacing_a_node_with_diff_ip = *replace_address != get_broadcast_address();
|
||||
|
||||
slogger.info("Replacing a node with {} IP address, my address={}, node being replaced={}",
|
||||
get_broadcast_address() == *replace_address ? "the same" : "a different",
|
||||
@@ -366,7 +366,6 @@ void storage_service::join_token_ring(int delay) {
|
||||
//
|
||||
// We attempted to replace this with a schema-presence check, but you need a meaningful sleep
|
||||
// to get schema info from gossip which defeats the purpose. See CASSANDRA-4427 for the gory details.
|
||||
std::unordered_set<inet_address> current;
|
||||
if (should_bootstrap()) {
|
||||
bool resume_bootstrap = db::system_keyspace::bootstrap_in_progress();
|
||||
if (resume_bootstrap) {
|
||||
@@ -428,16 +427,14 @@ void storage_service::join_token_ring(int delay) {
|
||||
if (!_bootstrap_tokens.empty()) {
|
||||
slogger.info("Using previously saved tokens = {}", _bootstrap_tokens);
|
||||
} else {
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local());
|
||||
slogger.info("Using newly generated tokens = {}", _bootstrap_tokens);
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local().get_config(), dht::check_token_endpoint::yes);
|
||||
}
|
||||
} else {
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local());
|
||||
slogger.info("Using newly generated tokens = {}", _bootstrap_tokens);
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(tmptr, _db.local().get_config(), dht::check_token_endpoint::yes);
|
||||
}
|
||||
} else {
|
||||
auto replace_addr = _db.local().get_replace_address();
|
||||
if (replace_addr && *replace_addr != get_broadcast_address()) {
|
||||
if (*replace_addr != get_broadcast_address()) {
|
||||
// Sleep additionally to make sure that the server actually is not alive
|
||||
// and giving it more time to gossip if alive.
|
||||
sleep_abortable(service::load_broadcaster::BROADCAST_INTERVAL, _abort_source).get();
|
||||
@@ -451,7 +448,6 @@ void storage_service::join_token_ring(int delay) {
|
||||
if (eps && eps->get_update_timestamp() > gms::gossiper::clk::now() - std::chrono::milliseconds(delay)) {
|
||||
throw std::runtime_error("Cannot replace a live node...");
|
||||
}
|
||||
current.insert(*existing);
|
||||
} else {
|
||||
throw std::runtime_error(format("Cannot replace token {} which does not exist!", token));
|
||||
}
|
||||
@@ -468,26 +464,12 @@ void storage_service::join_token_ring(int delay) {
|
||||
bootstrap(); // blocks until finished
|
||||
} else {
|
||||
maybe_start_sys_dist_ks();
|
||||
size_t num_tokens = _db.local().get_config().num_tokens();
|
||||
_bootstrap_tokens = db::system_keyspace::get_saved_tokens().get0();
|
||||
if (_bootstrap_tokens.empty()) {
|
||||
auto initial_tokens = _db.local().get_initial_tokens();
|
||||
if (initial_tokens.size() < 1) {
|
||||
_bootstrap_tokens = boot_strapper::get_random_tokens(get_token_metadata_ptr(), num_tokens);
|
||||
if (num_tokens == 1) {
|
||||
slogger.warn("Generated random token {}. Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations", _bootstrap_tokens);
|
||||
} else {
|
||||
slogger.info("Generated random tokens. tokens are {}", _bootstrap_tokens);
|
||||
}
|
||||
} else {
|
||||
for (auto token_string : initial_tokens) {
|
||||
auto token = dht::token::from_sstring(token_string);
|
||||
_bootstrap_tokens.insert(token);
|
||||
}
|
||||
slogger.info("Saved tokens not found. Using configuration value: {}", _bootstrap_tokens);
|
||||
}
|
||||
_bootstrap_tokens = boot_strapper::get_bootstrap_tokens(get_token_metadata_ptr(), _db.local().get_config(), dht::check_token_endpoint::no);
|
||||
db::system_keyspace::update_tokens(_bootstrap_tokens).get();
|
||||
} else {
|
||||
size_t num_tokens = _db.local().get_config().num_tokens();
|
||||
if (_bootstrap_tokens.size() != num_tokens) {
|
||||
throw std::runtime_error(format("Cannot change the number of tokens from {:d} to {:d}", _bootstrap_tokens.size(), num_tokens));
|
||||
} else {
|
||||
@@ -664,11 +646,9 @@ void storage_service::bootstrap() {
|
||||
set_mode(mode::JOINING, fmt::format("Wait until local node knows tokens of peer nodes"), true);
|
||||
_gossiper.wait_for_range_setup().get();
|
||||
auto replace_addr = _db.local().get_replace_address();
|
||||
if (replace_addr) {
|
||||
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
|
||||
db::system_keyspace::remove_endpoint(*replace_addr).get();
|
||||
_group0->leave_group0(replace_addr).get();
|
||||
}
|
||||
slogger.debug("Removing replaced endpoint {} from system.peers", *replace_addr);
|
||||
db::system_keyspace::remove_endpoint(*replace_addr).get();
|
||||
_group0->leave_group0(replace_addr).get();
|
||||
}
|
||||
|
||||
_db.invoke_on_all([this] (replica::database& db) {
|
||||
|
||||
Reference in New Issue
Block a user