Merge "token bootstrap"

From Asias:

"With this series, simple replication strategy is supposed to work.

Start two nodes, after bootstrap, uuid and token are spread correctly
through gossip:

ep=127.0.0.2, eps=EndpointState: HeartBeatState = generation = 1433298647, version = 156, AppStateMap =
{ 0 : Value(NORMAL,TOKENS,30) }  { 5 : Value(urchin_1_0,4) }  { 8 : Value(,3) }  { 11 : Value(ms_1_0,1) }
{ 12 : Value(6127dd57-fb40-4aca-9046-c3509eca4d1e,2) }  { 13 : Value(acd66a3f2e5bb1af;d0046f49663e2d9b;43dcb3bd8dc7397b,29) }
ep=127.0.0.1, eps=EndpointState: HeartBeatState = generation = 1433298640, version = 161, AppStateMap =
{ 0 : Value(NORMAL,TOKENS,11) }  { 5 : Value(urchin_1_0,4) }  { 8 : Value(,3) }  { 11 : Value(ms_1_0,1) }
{ 12 : Value(a6d2ac36-2f0e-492f-8676-198bbbc42dd1,2) }  { 13 : Value(71e4bd2878ec2446;2170ecc473cd6240;e0f2574988bb909e,9) }

Endpoint -> Token
inet_address=127.0.0.2, token=ac d6 6a 3f 2e 5b b1 af
inet_address=127.0.0.2, token=d0 04 6f 49 66 3e 2d 9b
inet_address=127.0.0.1, token=e0 f2 57 49 88 bb 90 9e
inet_address=127.0.0.1, token=21 70 ec c4 73 cd 62 40
inet_address=127.0.0.2, token=43 dc b3 bd 8d c7 39 7b
inet_address=127.0.0.1, token=71 e4 bd 28 78 ec 24 46

Endpoint -> UUID
inet_address=127.0.0.2, uuid=6127dd57-fb40-4aca-9046-c3509eca4d1e
inet_address=127.0.0.1, uuid=a6d2ac36-2f0e-492f-8676-198bbbc42dd1

Sorted Token
token=ac d6 6a 3f 2e 5b b1 af
token=d0 04 6f 49 66 3e 2d 9b
token=e0 f2 57 49 88 bb 90 9e
token=21 70 ec c4 73 cd 62 40
token=43 dc b3 bd 8d c7 39 7b
token=71 e4 bd 28 78 ec 24 46"
This commit is contained in:
Avi Kivity
2015-06-04 12:46:08 +03:00
18 changed files with 856 additions and 863 deletions

View File

@@ -29,6 +29,7 @@
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
#include "core/do_with.hh"
#include "service/storage_service.hh"
thread_local logging::logger dblog("database");
@@ -452,7 +453,7 @@ column_family::seal_active_memtable(database* db) {
if (cl != nullptr) {
cl->discard_completed_segments(_schema->id(), old->replay_position());
}
_memtables->erase(boost::find(*_memtables, old));
_memtables->erase(boost::range::find(*_memtables, old));
} catch (std::exception& e) {
dblog.error("failed to write sstable: {}", e.what());
} catch (...) {
@@ -684,20 +685,10 @@ const column_family& database::find_column_family(const utils::UUID& uuid) const
void
keyspace::create_replication_strategy() {
static thread_local locator::token_metadata tm;
static locator::simple_snitch snitch;
static std::unordered_map<sstring, sstring> options = {{"replication_factor", "3"}};
auto d2t = [](double d) {
unsigned long l = net::hton(static_cast<unsigned long>(d*(std::numeric_limits<unsigned long>::max())));
std::array<int8_t, 8> a;
memcpy(a.data(), &l, 8);
return a;
};
tm.update_normal_token({dht::token::kind::key, {d2t(0).data(), 8}}, to_sstring("127.0.0.1"));
tm.update_normal_token({dht::token::kind::key, {d2t(1.0/4).data(), 8}}, to_sstring("127.0.0.2"));
tm.update_normal_token({dht::token::kind::key, {d2t(2.0/4).data(), 8}}, to_sstring("127.0.0.3"));
tm.update_normal_token({dht::token::kind::key, {d2t(3.0/4).data(), 8}}, to_sstring("127.0.0.4"));
_replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), tm, snitch, options);
auto& ss = service::get_local_storage_service();
_replication_strategy = locator::abstract_replication_strategy::create_replication_strategy(_metadata->name(), _metadata->strategy_name(), ss.get_token_metadata(), snitch, options);
}
locator::abstract_replication_strategy&

View File

@@ -22,6 +22,7 @@
#include "gms/inet_address.hh"
#include "locator/token_metadata.hh"
#include "dht/i_partitioner.hh"
#include <unordered_set>
namespace dht {
@@ -76,7 +77,7 @@ public:
* otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
* else choose num_tokens tokens at random
*/
static std::set<token> get_bootstrap_tokens(token_metadata metadata) {
static std::unordered_set<token> get_bootstrap_tokens(token_metadata metadata) {
#if 0
Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
// if user specified tokens, use those
@@ -106,8 +107,8 @@ public:
return get_random_tokens(metadata, num_tokens);
}
static std::set<token> get_random_tokens(token_metadata metadata, size_t num_tokens) {
std::set<token> tokens;
static std::unordered_set<token> get_random_tokens(token_metadata metadata, size_t num_tokens) {
std::unordered_set<token> tokens;
while (tokens.size() < num_tokens) {
auto token = global_partitioner().get_random_token();
auto ep = metadata.get_endpoint(token);

View File

@@ -138,7 +138,7 @@ std::ostream& operator<<(std::ostream& out, const token& t) {
auto flags = out.flags();
for (auto c : t._data) {
unsigned char x = c;
out << std::hex << +x << " ";
out << std::hex << std::setw(2) << std::setfill('0') << +x << " ";
}
out.flags(flags);
return out;

View File

@@ -61,7 +61,7 @@ public:
// [0x00, 0x80] == 1/512
// [0xff, 0x80] == 1 - 1/512
bytes _data;
token(kind k, bytes d) : _kind(k), _data(d) {
token(kind k, bytes d) : _kind(std::move(k)), _data(std::move(d)) {
}
};

View File

@@ -190,20 +190,7 @@ double failure_detector::get_phi_convict_threshold() {
}
bool failure_detector::is_alive(inet_address ep) {
if (ep.is_broadcast_address()) {
return true;
}
auto eps = get_local_gossiper().get_endpoint_state_for_endpoint(ep);
// we could assert not-null, but having isAlive fail screws a node over so badly that
// it's worth being defensive here so minor bugs don't cause disproportionate
// badness. (See CASSANDRA-1463 for an example).
if (eps) {
return eps->is_alive();
} else {
// logger.error("unknown endpoint {}", ep);
return false;
}
return get_local_gossiper().is_alive(ep);
}
void failure_detector::report(inet_address ep) {

View File

@@ -167,7 +167,6 @@ public:
extern distributed<failure_detector> _the_failure_detector;
inline failure_detector& get_local_failure_detector() {
assert(engine().cpu_id() == 0);
return _the_failure_detector.local();
}
inline distributed<failure_detector>& get_failure_detector() {

View File

@@ -787,9 +787,7 @@ utils::UUID gossiper::get_host_id(inet_address endpoint) {
throw std::runtime_error(sprint("Host %s does not use new-style tokens!", endpoint));
}
sstring uuid = get_endpoint_state_for_endpoint(endpoint)->get_application_state(application_state::HOST_ID)->value;
// FIXME: Add UUID(const sstring& id) constructor
warn(unimplemented::cause::GOSSIP);
return utils::UUID(0, 0);
return utils::UUID(uuid);
}
std::experimental::optional<endpoint_state> gossiper::get_state_for_version_bigger_than(inet_address for_endpoint, int version) {
@@ -1240,4 +1238,20 @@ void gossiper::debug_show() {
reporter->arm_periodic(std::chrono::milliseconds(1000));
}
bool gossiper::is_alive(inet_address ep) {
if (ep == get_broadcast_address()) {
return true;
}
auto eps = get_endpoint_state_for_endpoint(ep);
// we could assert not-null, but having isAlive fail screws a node over so badly that
// it's worth being defensive here so minor bugs don't cause disproportionate
// badness. (See CASSANDRA-1463 for an example).
if (eps) {
return eps->is_alive();
} else {
// logger.error("unknown endpoint {}", ep);
return false;
}
}
} // namespace gms

View File

@@ -360,6 +360,7 @@ private:
void handle_major_state_change(inet_address ep, endpoint_state eps);
public:
bool is_alive(inet_address ep);
bool is_dead_state(endpoint_state eps);
void apply_state_locally(std::map<inet_address, endpoint_state>& map);

View File

@@ -21,7 +21,6 @@
#pragma once
#include <vector>
#include "types.hh"
#include "core/sstring.hh"
#include "util/serialization.hh"
@@ -30,6 +29,8 @@
#include "gms/inet_address.hh"
#include "dht/i_partitioner.hh"
#include "to_string.hh"
#include <unordered_set>
#include <vector>
namespace gms {
@@ -125,17 +126,17 @@ public:
return versioned_value(value.value);
}
versioned_value bootstrapping(const std::set<token>& tokens) {
versioned_value bootstrapping(const std::unordered_set<token>& tokens) {
return versioned_value(version_string({sstring(versioned_value::STATUS_BOOTSTRAPPING),
make_token_string(tokens)}));
}
versioned_value normal(const std::set<token>& tokens) {
versioned_value normal(const std::unordered_set<token>& tokens) {
return versioned_value(version_string({sstring(versioned_value::STATUS_NORMAL),
make_token_string(tokens)}));
}
sstring make_token_string(const std::set<token>& tokens) {
sstring make_token_string(const std::unordered_set<token>& tokens) {
// FIXME:
// return partitioner.getTokenFactory().toString(Iterables.get(tokens, 0));
return "TOKENS";
@@ -151,19 +152,19 @@ public:
return versioned_value(new_version.to_sstring());
}
versioned_value leaving(const std::set<token>& tokens) {
versioned_value leaving(const std::unordered_set<token>& tokens) {
return versioned_value(version_string({sstring(versioned_value::STATUS_LEAVING),
make_token_string(tokens)}));
}
versioned_value left(const std::set<token>& tokens, long expireTime) {
versioned_value left(const std::unordered_set<token>& tokens, long expireTime) {
return versioned_value(version_string({sstring(versioned_value::STATUS_LEFT),
make_token_string(tokens),
std::to_string(expireTime)}));
}
versioned_value moving(token t) {
std::set<token> tokens = {t};
std::unordered_set<token> tokens = {t};
return versioned_value(version_string({sstring(versioned_value::STATUS_MOVING),
make_token_string(tokens)}));
}
@@ -173,7 +174,7 @@ public:
return versioned_value(hostId.to_sstring());
}
versioned_value tokens(const std::set<token> tokens) {
versioned_value tokens(const std::unordered_set<token> tokens) {
sstring tokens_string;
for (auto it = tokens.cbegin(); it != tokens.cend(); ) {
tokens_string += to_hex(it->_data);

View File

@@ -2,6 +2,7 @@
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "utils/UUID.hh"
#include "token_metadata.hh"
#include <experimental/optional>
@@ -131,9 +132,13 @@ void token_metadata::debug_show() {
print("inet_address=%s, token=%s\n", x.second, x.first);
}
print("Endpoint -> UUID\n");
for (auto x: _endpoint_to_host_id_map) {
for (auto x : _endpoint_to_host_id_map) {
print("inet_address=%s, uuid=%s\n", x.first, x.second);
}
print("Sorted Token\n");
for (auto x : _sorted_tokens) {
print("token=%s\n", x);
}
});
reporter->arm_periodic(std::chrono::seconds(1));
}
@@ -160,5 +165,76 @@ void token_metadata::update_host_id(const UUID& host_id, inet_address endpoint)
_endpoint_to_host_id_map[endpoint] = host_id;
}
utils::UUID token_metadata::get_host_id(inet_address endpoint) {
assert(_endpoint_to_host_id_map.count(endpoint));
return _endpoint_to_host_id_map.at(endpoint);
}
gms::inet_address token_metadata::get_endpoint_for_host_id(UUID host_id) {
auto beg = _endpoint_to_host_id_map.cbegin();
auto end = _endpoint_to_host_id_map.cend();
auto it = std::find_if(beg, end, [host_id] (auto x) {
return x.second == host_id;
});
assert(it != end);
return (*it).first;
}
const auto& token_metadata::get_endpoint_to_host_id_map_for_reading() {
return _endpoint_to_host_id_map;
}
bool token_metadata::is_member(inet_address endpoint) {
auto beg = _token_to_endpoint_map.cbegin();
auto end = _token_to_endpoint_map.cend();
return end != std::find_if(beg, end, [endpoint] (const auto& x) {
return x.second == endpoint;
});
}
void token_metadata::add_bootstrap_token(token t, inet_address endpoint) {
std::unordered_set<token> tokens{t};
add_bootstrap_tokens(tokens, endpoint);
}
void token_metadata::add_bootstrap_tokens(std::unordered_set<token> tokens, inet_address endpoint) {
for (auto t : tokens) {
auto old_endpoint = _bootstrap_tokens.find(t);
if (old_endpoint != _bootstrap_tokens.end() && (*old_endpoint).second != endpoint) {
auto msg = sprint("Bootstrap Token collision between %s and %s (token %s", (*old_endpoint).second, endpoint, t);
throw std::runtime_error(msg);
}
auto old_endpoint2 = _token_to_endpoint_map.find(t);
if (old_endpoint2 != _token_to_endpoint_map.end() && (*old_endpoint2).second != endpoint) {
auto msg = sprint("Bootstrap Token collision between %s and %s (token %s", (*old_endpoint2).second, endpoint, t);
throw std::runtime_error(msg);
}
}
// Unfortunately, std::remove_if does not work with std::map
for (auto it = _bootstrap_tokens.begin(); it != _bootstrap_tokens.end();) {
if ((*it).second == endpoint) {
it = _bootstrap_tokens.erase(it);
} else {
it++;
}
}
for (auto t : tokens) {
_bootstrap_tokens[t] = endpoint;
}
}
void token_metadata::remove_bootstrap_tokens(std::unordered_set<token> tokens) {
assert(!tokens.empty());
for (auto t : tokens) {
_bootstrap_tokens.erase(t);
}
}
bool token_metadata::is_leaving(inet_address endpoint) {
return _leaving_endpoints.count(endpoint);
}
} // namespace locator

View File

@@ -53,6 +53,10 @@ private:
/** Maintains endpoint to host ID map of every node in the cluster */
std::unordered_map<inet_address, utils::UUID> _endpoint_to_host_id_map;
std::unordered_map<token, inet_address> _bootstrap_tokens;
std::unordered_set<inet_address> _leaving_endpoints;
std::unordered_map<token, inet_address> _moving_endpoints;
std::vector<token> _sorted_tokens;
topology _topology;
@@ -118,7 +122,7 @@ public:
private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>();
// nodes which are migrating to the new tokens in the ring
private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
private final Set<Pair<Token, InetAddress>> _moving_endpoints = new HashSet<Pair<Token, InetAddress>>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -165,7 +169,7 @@ public:
lock.readLock().lock();
try
{
for (Token token : bootstrapTokens.keySet())
for (Token token : _bootstrap_tokens.keySet())
for (Range<Token> range : sourceRanges)
if (range.contains(token))
n++;
@@ -216,10 +220,10 @@ public:
assert tokens != null && !tokens.isEmpty();
bootstrapTokens.removeValue(endpoint);
_bootstrap_tokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
topology.addEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
_leaving_endpoints.remove(endpoint);
removeFromMoving(endpoint); // also removing this endpoint from moving
for (Token token : tokens)
@@ -252,105 +256,22 @@ public:
* @param endpoint
*/
void update_host_id(const UUID& host_id, inet_address endpoint);
#if 0
/** Return the unique host ID for an end-point. */
public UUID getHostId(InetAddress endpoint)
{
lock.readLock().lock();
try
{
return _endpoint_to_host_id_map.get(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
UUID get_host_id(inet_address endpoint);
/** Return the end-point for a unique host ID */
public InetAddress getEndpointForHostId(UUID hostId)
{
lock.readLock().lock();
try
{
return _endpoint_to_host_id_map.inverse().get(hostId);
}
finally
{
lock.readLock().unlock();
}
}
inet_address get_endpoint_for_host_id(UUID host_id);
/** @return a copy of the endpoint-to-id map for read-only operations */
public Map<InetAddress, UUID> getEndpointToHostIdMapForReading()
{
lock.readLock().lock();
try
{
Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>();
readMap.putAll(_endpoint_to_host_id_map);
return readMap;
}
finally
{
lock.readLock().unlock();
}
}
const auto& get_endpoint_to_host_id_map_for_reading();
@Deprecated
public void addBootstrapToken(Token token, InetAddress endpoint)
{
addBootstrapTokens(Collections.singleton(token), endpoint);
}
void add_bootstrap_token(token t, inet_address endpoint);
public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
{
assert tokens != null && !tokens.isEmpty();
assert endpoint != null;
void add_bootstrap_tokens(std::unordered_set<token> tokens, inet_address endpoint);
lock.writeLock().lock();
try
{
InetAddress oldEndpoint;
for (Token token : tokens)
{
oldEndpoint = bootstrapTokens.get(token);
if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
oldEndpoint = tokenToEndpointMap.get(token);
if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
}
bootstrapTokens.removeValue(endpoint);
for (Token token : tokens)
bootstrapTokens.put(token, endpoint);
}
finally
{
lock.writeLock().unlock();
}
}
public void removeBootstrapTokens(Collection<Token> tokens)
{
assert tokens != null && !tokens.isEmpty();
lock.writeLock().lock();
try
{
for (Token token : tokens)
bootstrapTokens.remove(token);
}
finally
{
lock.writeLock().unlock();
}
}
void remove_bootstrap_tokens(std::unordered_set<token> tokens);
#if 0
public void addLeavingEndpoint(InetAddress endpoint)
{
@@ -359,7 +280,7 @@ public:
lock.writeLock().lock();
try
{
leavingEndpoints.add(endpoint);
_leaving_endpoints.add(endpoint);
}
finally
{
@@ -380,7 +301,7 @@ public:
try
{
movingEndpoints.add(Pair.create(token, endpoint));
_moving_endpoints.add(Pair.create(token, endpoint));
}
finally
{
@@ -395,10 +316,10 @@ public:
lock.writeLock().lock();
try
{
bootstrapTokens.removeValue(endpoint);
_bootstrap_tokens.removeValue(endpoint);
tokenToEndpointMap.removeValue(endpoint);
topology.removeEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
_leaving_endpoints.remove(endpoint);
_endpoint_to_host_id_map.remove(endpoint);
sortedTokens = sortTokens();
invalidateCachedRings();
@@ -420,11 +341,11 @@ public:
lock.writeLock().lock();
try
{
for (Pair<Token, InetAddress> pair : movingEndpoints)
for (Pair<Token, InetAddress> pair : _moving_endpoints)
{
if (pair.right.equals(endpoint))
{
movingEndpoints.remove(pair);
_moving_endpoints.remove(pair);
break;
}
}
@@ -459,58 +380,21 @@ public:
return getTokens(endpoint).iterator().next();
}
public boolean isMember(InetAddress endpoint)
{
assert endpoint != null;
#endif
lock.readLock().lock();
try
{
return tokenToEndpointMap.inverse().containsKey(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
bool is_member(inet_address endpoint);
public boolean isLeaving(InetAddress endpoint)
{
assert endpoint != null;
bool is_leaving(inet_address endpoint);
lock.readLock().lock();
try
{
return leavingEndpoints.contains(endpoint);
}
finally
{
lock.readLock().unlock();
}
}
public boolean isMoving(InetAddress endpoint)
{
assert endpoint != null;
lock.readLock().lock();
try
{
for (Pair<Token, InetAddress> pair : movingEndpoints)
{
if (pair.right.equals(endpoint))
return true;
bool is_moving(inet_address endpoint) {
for (auto x : _moving_endpoints) {
if (x.second == endpoint) {
return true;
}
return false;
}
finally
{
lock.readLock().unlock();
}
return false;
}
#if 0
private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
/**
@@ -570,7 +454,7 @@ public:
{
TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
for (InetAddress endpoint : leavingEndpoints)
for (InetAddress endpoint : _leaving_endpoints)
allLeftMetadata.removeEndpoint(endpoint);
return allLeftMetadata;
@@ -595,11 +479,11 @@ public:
{
TokenMetadata metadata = cloneOnlyTokenMap();
for (InetAddress endpoint : leavingEndpoints)
for (InetAddress endpoint : _leaving_endpoints)
metadata.removeEndpoint(endpoint);
for (Pair<Token, InetAddress> pair : movingEndpoints)
for (Pair<Token, InetAddress> pair : _moving_endpoints)
metadata.updateNormalToken(pair.left, pair.right);
return metadata;
@@ -704,7 +588,7 @@ public:
{
Multimap<Range<Token>, InetAddress> newPendingRanges = HashMultimap.create();
if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
if (_bootstrap_tokens.isEmpty() && _leaving_endpoints.isEmpty() && _moving_endpoints.isEmpty())
{
if (logger.isDebugEnabled())
logger.debug("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
@@ -720,7 +604,7 @@ public:
// get all ranges that will be affected by leaving nodes
Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
for (InetAddress endpoint : leavingEndpoints)
for (InetAddress endpoint : _leaving_endpoints)
affectedRanges.addAll(addressRanges.get(endpoint));
// for each of those ranges, find what new nodes will be responsible for the range when
@@ -738,7 +622,7 @@ public:
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
Multimap<InetAddress, Token> bootstrapAddresses = _bootstrap_tokens.inverse();
for (InetAddress endpoint : bootstrapAddresses.keySet())
{
Collection<Token> tokens = bootstrapAddresses.get(endpoint);
@@ -754,7 +638,7 @@ public:
// For each of the moving nodes, we do the same thing we did for bootstrapping:
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
for (Pair<Token, InetAddress> moving : movingEndpoints)
for (Pair<Token, InetAddress> moving : _moving_endpoints)
{
InetAddress endpoint = moving.right; // address of the moving node
@@ -802,7 +686,7 @@ public:
lock.readLock().lock();
try
{
return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
return new BiMultiValMap<Token, InetAddress>(_bootstrap_tokens);
}
finally
{
@@ -823,13 +707,13 @@ public:
}
}
/** caller should not modify leavingEndpoints */
/** caller should not modify _leaving_endpoints */
public Set<InetAddress> getLeavingEndpoints()
{
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(leavingEndpoints);
return ImmutableSet.copyOf(_leaving_endpoints);
}
finally
{
@@ -846,7 +730,7 @@ public:
lock.readLock().lock();
try
{
return ImmutableSet.copyOf(movingEndpoints);
return ImmutableSet.copyOf(_moving_endpoints);
}
finally
{
@@ -922,10 +806,10 @@ public:
{
tokenToEndpointMap.clear();
_endpoint_to_host_id_map.clear();
bootstrapTokens.clear();
leavingEndpoints.clear();
_bootstrap_tokens.clear();
_leaving_endpoints.clear();
pendingRanges.clear();
movingEndpoints.clear();
_moving_endpoints.clear();
sortedTokens.clear();
topology.clear();
invalidateCachedRings();
@@ -957,22 +841,22 @@ public:
}
}
if (!bootstrapTokens.isEmpty())
if (!_bootstrap_tokens.isEmpty())
{
sb.append("Bootstrapping Tokens:" );
sb.append(System.getProperty("line.separator"));
for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
for (Map.Entry<Token, InetAddress> entry : _bootstrap_tokens.entrySet())
{
sb.append(entry.getValue()).append(":").append(entry.getKey());
sb.append(System.getProperty("line.separator"));
}
}
if (!leavingEndpoints.isEmpty())
if (!_leaving_endpoints.isEmpty())
{
sb.append("Leaving Endpoints:");
sb.append(System.getProperty("line.separator"));
for (InetAddress ep : leavingEndpoints)
for (InetAddress ep : _leaving_endpoints)
{
sb.append(ep);
sb.append(System.getProperty("line.separator"));
@@ -1060,9 +944,9 @@ public:
lock.readLock().lock();
try
{
Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size());
Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + _bootstrap_tokens.size());
map.putAll(tokenToEndpointMap);
map.putAll(bootstrapTokens);
map.putAll(_bootstrap_tokens);
return map;
}
finally

View File

@@ -28,7 +28,7 @@ read_config(bpo::variables_map& opts, db::config& cfg) {
}
future<> init_storage_service() {
return service::get_storage_service().start_single().then([] {
return service::get_storage_service().start().then([] {
print("Start Storage service ...\n");
});
}
@@ -45,7 +45,7 @@ future<> init_messaging_service(auto listen_address, auto seed_provider) {
return net::get_messaging_service().start(listen).then([seeds] {
auto& ms = net::get_local_messaging_service();
print("Messaging server listening on ip %s port %d ...\n", ms.listen_address(), ms.port());
return gms::get_failure_detector().start_single().then([seeds] {
return gms::get_failure_detector().start().then([seeds] {
return gms::get_gossiper().start_single().then([seeds] {
auto& gossiper = gms::get_local_gossiper();
gossiper.set_seeds(seeds);

View File

@@ -20,7 +20,7 @@ bool storage_service::should_bootstrap() {
}
future<> storage_service::prepare_to_join() {
if (!joined) {
if (!_joined) {
std::map<gms::application_state, gms::versioned_value> app_states;
#if 0
if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
@@ -89,9 +89,8 @@ future<> storage_service::prepare_to_join() {
future<> storage_service::join_token_ring(int delay) {
auto f = make_ready_future<>();
_joined = true;
#if 0
joined = true;
// We bootstrap if we haven't successfully bootstrapped before, as long as we are not a seed.
// If we are a seed, or if the user manually sets auto_bootstrap to false,
// we'll skip streaming data from other nodes and jump directly into the ring.
@@ -212,7 +211,7 @@ future<> storage_service::join_token_ring(int delay) {
}
bootstrap(_bootstrap_tokens);
assert !isBootstrapMode; // bootstrap will block until finished
assert !_is_bootstrap_mode; // bootstrap will block until finished
#endif
} else {
// FIXME: DatabaseDescriptor.getNumTokens()
@@ -256,7 +255,7 @@ future<> storage_service::join_token_ring(int delay) {
if (Schema.instance.getKSMetaData(TraceKeyspace.NAME) == null)
MigrationManager.announceNewKeyspace(TraceKeyspace.definition(), 0, false);
if (!isSurveyMode)
if (!_is_survey_mode)
{
// start participating in the ring.
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
@@ -277,8 +276,32 @@ future<> storage_service::join_token_ring(int delay) {
});
}
future<> storage_service::bootstrap(std::set<token> tokens) {
// isBootstrapMode = true;
void storage_service::join_ring() {
#if 0
if (!joined) {
logger.info("Joining ring by operator request");
try
{
joinTokenRing(0);
}
catch (ConfigurationException e)
{
throw new IOException(e.getMessage());
}
} else if (_is_survey_mode) {
set_tokens(SystemKeyspace.getSavedTokens());
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
_is_survey_mode = false;
logger.info("Leaving write survey mode and joining ring at operator request");
assert _token_metadata.sortedTokens().size() > 0;
Auth.setup();
}
#endif
}
future<> storage_service::bootstrap(std::unordered_set<token> tokens) {
_is_bootstrap_mode = true;
// SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
// FIXME: DatabaseDescriptor.isReplacing()
auto is_replacing = false;
@@ -309,5 +332,586 @@ future<> storage_service::bootstrap(std::set<token> tokens) {
});
}
void storage_service::handle_state_bootstrap(inet_address endpoint) {
ss_debug("SS::handle_state_bootstrap endpoint=%s\n", endpoint);
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
auto tokens = get_tokens_for(endpoint);
// if (logger.isDebugEnabled())
// logger.debug("Node {} state bootstrapping, token {}", endpoint, tokens);
// if this node is present in token metadata, either we have missed intermediate states
// or the node had crashed. Print warning if needed, clear obsolete stuff and
// continue.
if (_token_metadata.is_member(endpoint)) {
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
// isLeaving is true, we have only missed LEFT. Waiting time between completing
// leave operation and rebootstrapping is relatively short, so the latter is quite
// common (not enough time for gossip to spread). Therefore we report only the
// former in the log.
if (!_token_metadata.is_leaving(endpoint)) {
// logger.info("Node {} state jump to bootstrap", endpoint);
}
// _token_metadata.removeEndpoint(endpoint);
}
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
// FIXME
// PendingRangeCalculatorService.instance.update();
auto& gossiper = gms::get_local_gossiper();
if (gossiper.uses_host_id(endpoint)) {
_token_metadata.update_host_id(gossiper.get_host_id(endpoint), endpoint);
}
}
void storage_service::handle_state_normal(inet_address endpoint) {
ss_debug("SS::handle_state_bootstrap endpoint=%s\n", endpoint);
auto tokens = get_tokens_for(endpoint);
auto& gossiper = gms::get_local_gossiper();
std::unordered_set<token> tokensToUpdateInMetadata;
std::unordered_set<token> tokensToUpdateInSystemKeyspace;
std::unordered_set<token> localTokensToRemove;
std::unordered_set<inet_address> endpointsToRemove;
// if (logger.isDebugEnabled())
// logger.debug("Node {} state normal, token {}", endpoint, tokens);
if (_token_metadata.is_member(endpoint)) {
// logger.info("Node {} state jump to normal", endpoint);
}
update_peer_info(endpoint);
#if 1
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (gossiper.uses_host_id(endpoint)) {
auto host_id = gossiper.get_host_id(endpoint);
//inet_address existing = _token_metadata.get_endpoint_for_host_id(host_id);
// if (DatabaseDescriptor.isReplacing() &&
// Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null &&
// (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) {
if (false) {
// logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
} else {
if (false /*existing != null && !existing.equals(endpoint)*/) {
#if 0
if (existing == get_broadcast_address()) {
logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint);
_token_metadata.removeEndpoint(endpoint);
endpointsToRemove.add(endpoint);
} else if (gossiper.compare_endpoint_startup(endpoint, existing) > 0) {
logger.warn("Host ID collision for {} between {} and {}; {} is the new owner", hostId, existing, endpoint, endpoint);
_token_metadata.removeEndpoint(existing);
endpointsToRemove.add(existing);
_token_metadata.update_host_id(hostId, endpoint);
} else {
logger.warn("Host ID collision for {} between {} and {}; ignored {}", hostId, existing, endpoint, endpoint);
_token_metadata.removeEndpoint(endpoint);
endpointsToRemove.add(endpoint);
}
#endif
} else {
_token_metadata.update_host_id(host_id, endpoint);
}
}
}
#endif
for (auto t : tokens) {
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
auto current_owner = _token_metadata.get_endpoint(t);
if (!current_owner) {
// logger.debug("New node {} at token {}", endpoint, t);
tokensToUpdateInMetadata.insert(t);
tokensToUpdateInSystemKeyspace.insert(t);
} else if (endpoint == *current_owner) {
// set state back to normal, since the node may have tried to leave, but failed and is now back up
tokensToUpdateInMetadata.insert(t);
tokensToUpdateInSystemKeyspace.insert(t);
} else if (gossiper.compare_endpoint_startup(endpoint, *current_owner) > 0) {
tokensToUpdateInMetadata.insert(t);
tokensToUpdateInSystemKeyspace.insert(t);
#if 0
// currentOwner is no longer current, endpoint is. Keep track of these moves, because when
// a host no longer has any tokens, we'll want to remove it.
Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
epToTokenCopy.get(currentOwner).remove(token);
if (epToTokenCopy.get(currentOwner).size() < 1)
endpointsToRemove.add(currentOwner);
logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner",
endpoint,
currentOwner,
token,
endpoint));
#endif
} else {
#if 0
logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s",
endpoint,
currentOwner,
token,
endpoint));
#endif
}
}
bool is_moving = _token_metadata.is_moving(endpoint); // capture because updateNormalTokens clears moving status
_token_metadata.update_normal_tokens(tokensToUpdateInMetadata, endpoint);
// for (auto ep : endpointsToRemove) {
// removeEndpoint(ep);
// if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep))
// Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
// }
if (!tokensToUpdateInSystemKeyspace.empty()) {
// SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
}
if (!localTokensToRemove.empty()) {
// SystemKeyspace.updateLocalTokens(Collections.<Token>emptyList(), localTokensToRemove);
}
if (is_moving) {
// _token_metadata.remove_from_moving(endpoint);
// for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
// subscriber.onMove(endpoint);
} else {
// for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
// subscriber.onJoinCluster(endpoint);
}
// PendingRangeCalculatorService.instance.update();
}
void storage_service::handle_state_leaving(inet_address endpoint) {
#if 0
Collection<Token> tokens;
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
// If the node is previously unknown or tokens do not match, update tokenmetadata to
// have this node as 'normal' (it must have been using this token before the
// leave). This way we'll get pending ranges right.
if (!_token_metadata.isMember(endpoint))
{
logger.info("Node {} state jump to leaving", endpoint);
_token_metadata.updateNormalTokens(tokens, endpoint);
}
else if (!_token_metadata.getTokens(endpoint).containsAll(tokens))
{
logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint);
_token_metadata.updateNormalTokens(tokens, endpoint);
}
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
_token_metadata.addLeavingEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void storage_service::handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert pieces.length >= 2;
Collection<Token> tokens;
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state left, tokens {}", endpoint, tokens);
excise(tokens, endpoint, extractExpireTime(pieces));
#endif
}
void storage_service::handle_state_moving(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert pieces.length >= 2;
Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
if (logger.isDebugEnabled())
logger.debug("Node {} state moving, new token {}", endpoint, token);
_token_metadata.addMovingEndpoint(token, endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void storage_service::handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert (pieces.length > 0);
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
try
{
drain();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return;
}
if (_token_metadata.isMember(endpoint))
{
String state = pieces[0];
Collection<Token> removeTokens = _token_metadata.getTokens(endpoint);
if (VersionedValue.REMOVED_TOKEN.equals(state))
{
excise(removeTokens, endpoint, extractExpireTime(pieces));
}
else if (VersionedValue.REMOVING_TOKEN.equals(state))
{
if (logger.isDebugEnabled())
logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint);
// Note that the endpoint is being removed
_token_metadata.addLeavingEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
// find the endpoint coordinating this removal that we need to notify when we're done
String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
UUID hostId = UUID.fromString(coordinator[1]);
// grab any data we are now responsible for and notify responsible node
restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId));
}
}
else // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
{
if (VersionedValue.REMOVED_TOKEN.equals(pieces[0]))
addExpireTimeIfFound(endpoint, extractExpireTime(pieces));
removeEndpoint(endpoint);
}
#endif
}
void storage_service::on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) {
ss_debug("SS::on_join endpoint=%s\n", endpoint);
auto tokens = get_tokens_for(endpoint);
for (auto t : tokens) {
ss_debug("t=%s\n", t);
}
for (auto e : ep_state.get_application_state_map()) {
on_change(endpoint, e.first, e.second);
}
// MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
}
void storage_service::on_alive(gms::inet_address endpoint, gms::endpoint_state state) {
ss_debug("SS::on_alive endpoint=%s\n", endpoint);
#if 0
MigrationManager.instance.scheduleSchemaPull(endpoint, state);
if (_token_metadata.isMember(endpoint))
{
HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onUp(endpoint);
}
#endif
}
void storage_service::before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) {
// no-op
}
void storage_service::on_change(inet_address endpoint, application_state state, versioned_value value) {
ss_debug("SS::on_change endpoint=%s\n", endpoint);
if (state == application_state::STATUS) {
std::vector<sstring> pieces;
boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
assert(pieces.size() > 0);
sstring move_name = pieces[0];
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
handle_state_bootstrap(endpoint);
} else if (move_name == sstring(versioned_value::STATUS_NORMAL)) {
handle_state_normal(endpoint);
} else if (move_name == sstring(versioned_value::REMOVING_TOKEN) ||
move_name == sstring(versioned_value::REMOVED_TOKEN)) {
handle_state_removing(endpoint, pieces);
} else if (move_name == sstring(versioned_value::STATUS_LEAVING)) {
handle_state_leaving(endpoint);
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
handle_state_left(endpoint, pieces);
} else if (move_name == sstring(versioned_value::STATUS_MOVING)) {
handle_state_moving(endpoint, pieces);
}
} else {
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!ep_state || gossiper.is_dead_state(*ep_state)) {
// logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return;
}
if (state == application_state::RELEASE_VERSION) {
// SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
} else if (state == application_state::DC) {
// SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
} else if (state == application_state::RACK) {
// SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
} else if (state == application_state::RPC_ADDRESS) {
// try {
// SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
// } catch (UnknownHostException e) {
// throw new RuntimeException(e);
// }
} else if (state == application_state::SCHEMA) {
// SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
// MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
} else if (state == application_state::HOST_ID) {
// SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
}
}
replicate_to_all_cores();
}
void storage_service::on_remove(gms::inet_address endpoint) {
#if 0
_token_metadata.removeEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void storage_service::on_dead(gms::inet_address endpoint, gms::endpoint_state state) {
#if 0
MessagingService.instance().convict(endpoint);
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onDown(endpoint);
#endif
}
void storage_service::on_restart(gms::inet_address endpoint, gms::endpoint_state state) {
#if 0
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (state.isAlive())
onDead(endpoint, state);
#endif
}
void storage_service::update_peer_info(gms::inet_address endpoint) {
using namespace gms;
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!ep_state) {
return;
}
for (auto& entry : ep_state->get_application_state_map()) {
auto& app_state = entry.first;
//auto& value = entry.second.value
if (app_state == application_state::RELEASE_VERSION) {
// SystemKeyspace.updatePeerInfo(endpoint, "release_version", value);
} else if (app_state == application_state::DC) {
// SystemKeyspace.updatePeerInfo(endpoint, "data_center", value);
} else if (app_state == application_state::RACK) {
// SystemKeyspace.updatePeerInfo(endpoint, "rack", value);
} else if (app_state == application_state::RPC_ADDRESS) {
// SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value));
} else if (app_state == application_state::SCHEMA) {
// SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value));
} else if (app_state == application_state::HOST_ID) {
// SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value));
}
}
}
sstring storage_service::get_application_state_value(inet_address endpoint, application_state appstate) {
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!eps) {
return {};
}
auto v = eps->get_application_state(appstate);
if (!v) {
return {};
}
return v->value;
}
std::unordered_set<locator::token> storage_service::get_tokens_for(inet_address endpoint) {
auto tokens_string = get_application_state_value(endpoint, application_state::TOKENS);
ss_debug("endpoint=%s, tokens_string=%s\n", endpoint, tokens_string);
std::vector<sstring> tokens;
std::unordered_set<token> ret;
boost::split(tokens, tokens_string, boost::is_any_of(";"));
for (auto str : tokens) {
ss_debug("token=%s\n", str);
sstring_view sv(str);
bytes b = from_hex(sv);
ret.emplace(token::kind::key, b);
}
return ret;
}
void storage_service::set_tokens(std::unordered_set<token> tokens) {
// if (logger.isDebugEnabled())
// logger.debug("Setting tokens to {}", tokens);
// SystemKeyspace.updateTokens(tokens);
for (auto t : tokens) {
_token_metadata.update_normal_token(t, get_broadcast_address());
}
// Collection<Token> localTokens = getLocalTokens();
auto local_tokens = _bootstrap_tokens;
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.normal(local_tokens));
//setMode(Mode.NORMAL, false);
replicate_to_all_cores();
}
future<> storage_service::init_server(int delay) {
#if 0
logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
logger.info("Thrift API version: {}", cassandraConstants.VERSION);
logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
#endif
_initialized = true;
#if 0
try
{
// Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
Class.forName("org.apache.cassandra.service.StorageProxy");
// also IndexSummaryManager, which is otherwise unreferenced
Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager");
}
catch (ClassNotFoundException e)
{
throw new AssertionError(e);
}
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Loading persisted ring state");
Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
for (InetAddress ep : loadedTokens.keySet())
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
{
// entry has been mistakenly added, delete it
SystemKeyspace.removeEndpoint(ep);
}
else
{
_token_metadata.updateNormalTokens(loadedTokens.get(ep), ep);
if (loadedHostIds.containsKey(ep))
_token_metadata.update_host_id(loadedHostIds.get(ep), ep);
Gossiper.instance.addSavedEndpoint(ep);
}
}
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
return; // drained already
if (daemon != null)
shutdownClientServers();
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.all())
{
KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName());
if (!ksm.durableWrites)
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
}
try
{
FBUtilities.waitOnFutures(flushes);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
// don't let this stop us from shutting down the commitlog and other thread pools
logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
}
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
}
}, "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
#endif
return prepare_to_join().then([this, delay] {
return join_token_ring(delay);
});
#if 0
// Has to be called after the host id has potentially changed in prepareToJoin().
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
if (cfs.metadata.isCounter())
cfs.initCounterCache();
if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
{
joinTokenRing(delay);
}
else
{
Collection<Token> tokens = SystemKeyspace.getSavedTokens();
if (!tokens.isEmpty())
{
_token_metadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
Gossiper.instance.addLocalApplicationStates(states);
}
logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
}
#endif
}
void storage_service::replicate_to_all_cores() {
assert(engine().cpu_id() == 0);
// FIXME: There is no back pressure. If the remote cores are slow, and
// replication is called often, it will queue tasks to the semaphore
// without end.
_replicate_task.wait().then([this] {
return _the_storage_service.invoke_on_all([tm = _token_metadata] (storage_service& local_ss) {
if (engine().cpu_id() != 0) {
local_ss._token_metadata = tm;
}
});
}).then_wrapped([this] (auto&& f) {
try {
_replicate_task.signal();
f.get();
} catch (...) {
print("storage_service: Fail to replicate _token_metadata\n");
}
});
}
} // namespace service

View File

@@ -32,6 +32,7 @@
#include "core/sleep.hh"
#include "gms/application_state.hh"
#include "db/system_keyspace.hh"
#include "core/semaphore.hh"
namespace service {
@@ -64,7 +65,7 @@ class storage_service : public gms::i_endpoint_state_change_subscriber
public:
static int RING_DELAY; // delay after which we assume ring has stablized
const locator::token_metadata& get_token_metadata() const {
locator::token_metadata& get_token_metadata() {
return _token_metadata;
}
private:
@@ -82,7 +83,7 @@ private:
}
else
#endif
return 30 * 1000;
return 5 * 1000;
}
/* This abstraction maintains the token/endpoint metadata information */
token_metadata _token_metadata;
@@ -119,16 +120,19 @@ private:
private InetAddress removingNode;
#endif
private:
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
bool _is_bootstrap_mode;
/* we bootstrap but do NOT join the ring unless told to do so */
private boolean isSurveyMode= Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
#endif
private:
bool initialized;
// FIXME: System.getProperty("cassandra.write_survey", "false")
bool _is_survey_mode = false;
bool joined = false;
bool _initialized;
bool _joined = false;
#if 0
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
@@ -150,33 +154,15 @@ private:
#endif
private:
std::set<token> _bootstrap_tokens;
#if 0
std::unordered_set<token> _bootstrap_tokens;
public void finishBootstrapping()
{
isBootstrapMode = false;
public:
void finish_bootstrapping() {
_is_bootstrap_mode = false;
}
/** This method updates the local token on disk */
#endif
void set_tokens(std::set<token> tokens) {
// if (logger.isDebugEnabled())
// logger.debug("Setting tokens to {}", tokens);
// SystemKeyspace.updateTokens(tokens);
for (auto t : tokens) {
_token_metadata.update_normal_token(t, get_broadcast_address());
}
// Collection<Token> localTokens = getLocalTokens();
auto local_tokens = _bootstrap_tokens;
auto& gossiper = gms::get_local_gossiper();
gossiper.add_local_application_state(gms::application_state::TOKENS, value_factory.tokens(local_tokens));
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.bootstrapping(local_tokens));
//setMode(Mode.NORMAL, false);
}
void set_tokens(std::unordered_set<token> tokens);
#if 0
public void registerDaemon(CassandraDaemon daemon)
@@ -197,22 +183,22 @@ private:
// should only be called via JMX
public void stopGossiping()
{
if (initialized)
if (_initialized)
{
logger.warn("Stopping gossip by operator request");
Gossiper.instance.stop();
initialized = false;
_initialized = false;
}
}
// should only be called via JMX
public void startGossiping()
{
if (!initialized)
if (!_initialized)
{
logger.warn("Starting gossip by operator request");
Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
initialized = true;
_initialized = true;
}
}
@@ -321,11 +307,12 @@ private:
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
StageManager.shutdownNow();
}
public boolean isInitialized()
{
return initialized;
#endif
public:
bool is_initialized() {
return _initialized;
}
#if 0
public void stopDaemon()
{
@@ -397,7 +384,7 @@ private:
// for testing only
public void unsafeInitialize() throws ConfigurationException
{
initialized = true;
_initialized = true;
Gossiper.instance.register(this);
Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
@@ -410,133 +397,7 @@ public:
return init_server(RING_DELAY);
}
future<> init_server(int delay) {
#if 0
logger.info("Cassandra version: {}", FBUtilities.getReleaseVersionString());
logger.info("Thrift API version: {}", cassandraConstants.VERSION);
logger.info("CQL supported versions: {} (default: {})", StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
#endif
initialized = true;
#if 0
try
{
// Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
Class.forName("org.apache.cassandra.service.StorageProxy");
// also IndexSummaryManager, which is otherwise unreferenced
Class.forName("org.apache.cassandra.io.sstable.IndexSummaryManager");
}
catch (ClassNotFoundException e)
{
throw new AssertionError(e);
}
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Loading persisted ring state");
Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
for (InetAddress ep : loadedTokens.keySet())
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
{
// entry has been mistakenly added, delete it
SystemKeyspace.removeEndpoint(ep);
}
else
{
_token_metadata.updateNormalTokens(loadedTokens.get(ep), ep);
if (loadedHostIds.containsKey(ep))
_token_metadata.update_host_id(loadedHostIds.get(ep), ep);
Gossiper.instance.addSavedEndpoint(ep);
}
}
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
drainOnShutdown = new Thread(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException
{
ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown() && counterMutationStage.isShutdown())
return; // drained already
if (daemon != null)
shutdownClientServers();
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
// In-progress writes originating here could generate hints to be written, so shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down
MessagingService.instance().shutdown();
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.all())
{
KSMetaData ksm = Schema.instance.getKSMetaData(keyspace.getName());
if (!ksm.durableWrites)
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush());
}
}
try
{
FBUtilities.waitOnFutures(flushes);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
// don't let this stop us from shutting down the commitlog and other thread pools
logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
}
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
}
}, "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
#endif
return prepare_to_join().then([this, delay] {
return join_token_ring(delay);
});
#if 0
// Has to be called after the host id has potentially changed in prepareToJoin().
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
if (cfs.metadata.isCounter())
cfs.initCounterCache();
if (Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))
{
joinTokenRing(delay);
}
else
{
Collection<Token> tokens = SystemKeyspace.getSavedTokens();
if (!tokens.isEmpty())
{
_token_metadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
// order is important here, the gossiper can fire in between adding these two states. It's ok to send TOKENS without STATUS, but *not* vice versa.
List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>();
states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
states.add(Pair.create(ApplicationState.STATUS, valueFactory.hibernate(true)));
Gossiper.instance.addLocalApplicationStates(states);
}
logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing()) to initiate ring joining");
}
#endif
}
future<> init_server(int delay);
#if 0
/**
* In the event of forceful termination we need to remove the shutdown hook to prevent hanging (OOM for instance)
@@ -560,38 +421,13 @@ private:
Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.instance.valueFactory.datacenter(dc));
Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.instance.valueFactory.rack(rack));
}
public synchronized void joinRing() throws IOException
{
if (!joined)
{
logger.info("Joining ring by operator request");
try
{
joinTokenRing(0);
}
catch (ConfigurationException e)
{
throw new IOException(e.getMessage());
}
}
else if (isSurveyMode)
{
set_tokens(SystemKeyspace.getSavedTokens());
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.COMPLETED);
isSurveyMode = false;
logger.info("Leaving write survey mode and joining ring at operator request");
assert _token_metadata.sortedTokens().size() > 0;
Auth.setup();
}
#endif
public:
void join_ring();
bool is_joined() {
return _joined;
}
public boolean isJoined()
{
return joined;
}
#if 0
public void rebuild(String sourceDc)
{
logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
@@ -667,13 +503,14 @@ private:
}
#endif
future<> bootstrap(std::set<token> tokens);
#if 0
public boolean isBootstrapMode()
{
return isBootstrapMode;
future<> bootstrap(std::unordered_set<token> tokens);
bool is_bootstrap_mode() {
return _is_bootstrap_mode;
}
#if 0
public TokenMetadata getTokenMetadata()
{
return _token_metadata;
@@ -961,11 +798,8 @@ private:
}
#endif
public:
void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override
{
// no-op
}
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override;
/*
* Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
* ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
@@ -998,159 +832,24 @@ public:
* Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removenode, decommission or move.
*/
void on_change(inet_address endpoint, application_state state, versioned_value value) override {
ss_debug("SS::on_change endpoint=%s\n", endpoint);
if (state == application_state::STATUS) {
std::vector<sstring> pieces;
boost::split(pieces, value.value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
assert(pieces.size() > 0);
sstring move_name = pieces[0];
if (move_name == sstring(versioned_value::STATUS_BOOTSTRAPPING)) {
handle_state_bootstrap(endpoint);
} else if (move_name == sstring(versioned_value::STATUS_NORMAL)) {
handle_state_normal(endpoint);
} else if (move_name == sstring(versioned_value::REMOVING_TOKEN) ||
move_name == sstring(versioned_value::REMOVED_TOKEN)) {
handle_state_removing(endpoint, pieces);
} else if (move_name == sstring(versioned_value::STATUS_LEAVING)) {
handle_state_leaving(endpoint);
} else if (move_name == sstring(versioned_value::STATUS_LEFT)) {
handle_state_left(endpoint, pieces);
} else if (move_name == sstring(versioned_value::STATUS_MOVING)) {
handle_state_moving(endpoint, pieces);
}
} else {
auto& gossiper = gms::get_local_gossiper();
auto ep_state = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!ep_state || gossiper.is_dead_state(*ep_state)) {
// logger.debug("Ignoring state change for dead or unknown endpoint: {}", endpoint);
return;
}
if (state == application_state::RELEASE_VERSION) {
// SystemKeyspace.updatePeerInfo(endpoint, "release_version", value.value);
} else if (state == application_state::DC) {
// SystemKeyspace.updatePeerInfo(endpoint, "data_center", value.value);
} else if (state == application_state::RACK) {
// SystemKeyspace.updatePeerInfo(endpoint, "rack", value.value);
} else if (state == application_state::RPC_ADDRESS) {
// try {
// SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(value.value));
// } catch (UnknownHostException e) {
// throw new RuntimeException(e);
// }
} else if (state == application_state::SCHEMA) {
// SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(value.value));
// MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
} else if (state == application_state::HOST_ID) {
// SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value));
}
}
}
#if 0
private void updatePeerInfo(InetAddress endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
for (Map.Entry<ApplicationState, VersionedValue> entry : epState.getApplicationStateMap().entrySet())
{
switch (entry.getKey())
{
case RELEASE_VERSION:
SystemKeyspace.updatePeerInfo(endpoint, "release_version", entry.getValue().value);
break;
case DC:
SystemKeyspace.updatePeerInfo(endpoint, "data_center", entry.getValue().value);
break;
case RACK:
SystemKeyspace.updatePeerInfo(endpoint, "rack", entry.getValue().value);
break;
case RPC_ADDRESS:
try
{
SystemKeyspace.updatePeerInfo(endpoint, "rpc_address", InetAddress.getByName(entry.getValue().value));
}
catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
break;
case SCHEMA:
SystemKeyspace.updatePeerInfo(endpoint, "schema_version", UUID.fromString(entry.getValue().value));
break;
case HOST_ID:
SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(entry.getValue().value));
break;
}
}
}
#endif
virtual void on_change(inet_address endpoint, application_state state, versioned_value value) override;
virtual void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override;
virtual void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override;
virtual void on_remove(gms::inet_address endpoint) override;
virtual void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override;
private:
sstring get_application_state_value(inet_address endpoint, application_state appstate) {
auto& gossiper = gms::get_local_gossiper();
auto eps = gossiper.get_endpoint_state_for_endpoint(endpoint);
if (!eps) {
return {};
}
auto v = eps->get_application_state(appstate);
if (!v) {
return {};
}
return v->value;
}
std::set<token> get_tokens_for(inet_address endpoint) {
auto tokens_string = get_application_state_value(endpoint, application_state::TOKENS);
ss_debug("endpoint=%s, tokens_string=%s\n", endpoint, tokens_string);
std::vector<sstring> tokens;
std::set<token> ret;
boost::split(tokens, tokens_string, boost::is_any_of(";"));
for (auto str : tokens) {
ss_debug("token=%s\n", str);
sstring_view sv(str);
bytes b = from_hex(sv);
ret.emplace(token::kind::key, b);
}
return ret;
}
void update_peer_info(inet_address endpoint);
sstring get_application_state_value(inet_address endpoint, application_state appstate);
std::unordered_set<token> get_tokens_for(inet_address endpoint);
void replicate_to_all_cores();
semaphore _replicate_task{1};
private:
/**
* Handle node bootstrap
*
* @param endpoint bootstrapping node
*/
void handle_state_bootstrap(inet_address endpoint) {
#if 0
Collection<Token> tokens;
// explicitly check for TOKENS, because a bootstrapping node might be bootstrapping in legacy mode; that is, not using vnodes and no token specified
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state bootstrapping, token {}", endpoint, tokens);
// if this node is present in token metadata, either we have missed intermediate states
// or the node had crashed. Print warning if needed, clear obsolete stuff and
// continue.
if (_token_metadata.isMember(endpoint))
{
// If isLeaving is false, we have missed both LEAVING and LEFT. However, if
// isLeaving is true, we have only missed LEFT. Waiting time between completing
// leave operation and rebootstrapping is relatively short, so the latter is quite
// common (not enough time for gossip to spread). Therefore we report only the
// former in the log.
if (!_token_metadata.isLeaving(endpoint))
logger.info("Node {} state jump to bootstrap", endpoint);
_token_metadata.removeEndpoint(endpoint);
}
_token_metadata.addBootstrapTokens(tokens, endpoint);
PendingRangeCalculatorService.instance.update();
if (Gossiper.instance.usesHostId(endpoint))
_token_metadata.update_host_id(Gossiper.instance.getHostId(endpoint), endpoint);
#endif
}
void handle_state_bootstrap(inet_address endpoint);
/**
* Handle node move to normal state. That is, node is entering token ring and participating
@@ -1158,167 +857,14 @@ private:
*
* @param endpoint node
*/
void handle_state_normal(inet_address endpoint) {
#if 0
Collection<Token> tokens;
tokens = get_tokens_for(endpoint);
Set<Token> tokensToUpdateInMetadata = new HashSet<>();
Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
Set<Token> localTokensToRemove = new HashSet<>();
Set<InetAddress> endpointsToRemove = new HashSet<>();
if (logger.isDebugEnabled())
logger.debug("Node {} state normal, token {}", endpoint, tokens);
if (_token_metadata.isMember(endpoint))
logger.info("Node {} state jump to normal", endpoint);
updatePeerInfo(endpoint);
// Order Matters, TM.updateHostID() should be called before TM.updateNormalToken(), (see CASSANDRA-4300).
if (Gossiper.instance.usesHostId(endpoint))
{
UUID hostId = Gossiper.instance.getHostId(endpoint);
InetAddress existing = _token_metadata.getEndpointForHostId(hostId);
if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress()))))
logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
else
{
if (existing != null && !existing.equals(endpoint))
{
if (existing.equals(FBUtilities.getBroadcastAddress()))
{
logger.warn("Not updating host ID {} for {} because it's mine", hostId, endpoint);
_token_metadata.removeEndpoint(endpoint);
endpointsToRemove.add(endpoint);
}
else if (Gossiper.instance.compareEndpointStartup(endpoint, existing) > 0)
{
logger.warn("Host ID collision for {} between {} and {}; {} is the new owner", hostId, existing, endpoint, endpoint);
_token_metadata.removeEndpoint(existing);
endpointsToRemove.add(existing);
_token_metadata.update_host_id(hostId, endpoint);
}
else
{
logger.warn("Host ID collision for {} between {} and {}; ignored {}", hostId, existing, endpoint, endpoint);
_token_metadata.removeEndpoint(endpoint);
endpointsToRemove.add(endpoint);
}
}
else
_token_metadata.update_host_id(hostId, endpoint);
}
}
for (final Token token : tokens)
{
// we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
InetAddress currentOwner = _token_metadata.getEndpoint(token);
if (currentOwner == null)
{
logger.debug("New node {} at token {}", endpoint, token);
tokensToUpdateInMetadata.add(token);
tokensToUpdateInSystemKeyspace.add(token);
}
else if (endpoint.equals(currentOwner))
{
// set state back to normal, since the node may have tried to leave, but failed and is now back up
tokensToUpdateInMetadata.add(token);
tokensToUpdateInSystemKeyspace.add(token);
}
else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
{
tokensToUpdateInMetadata.add(token);
tokensToUpdateInSystemKeyspace.add(token);
// currentOwner is no longer current, endpoint is. Keep track of these moves, because when
// a host no longer has any tokens, we'll want to remove it.
Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
epToTokenCopy.get(currentOwner).remove(token);
if (epToTokenCopy.get(currentOwner).size() < 1)
endpointsToRemove.add(currentOwner);
logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner",
endpoint,
currentOwner,
token,
endpoint));
}
else
{
logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s",
endpoint,
currentOwner,
token,
endpoint));
}
}
boolean isMoving = _token_metadata.isMoving(endpoint); // capture because updateNormalTokens clears moving status
_token_metadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
for (InetAddress ep : endpointsToRemove)
{
removeEndpoint(ep);
if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep))
Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
}
if (!tokensToUpdateInSystemKeyspace.isEmpty())
SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace);
if (!localTokensToRemove.isEmpty())
SystemKeyspace.updateLocalTokens(Collections.<Token>emptyList(), localTokensToRemove);
if (isMoving)
{
_token_metadata.removeFromMoving(endpoint);
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onMove(endpoint);
}
else
{
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onJoinCluster(endpoint);
}
PendingRangeCalculatorService.instance.update();
#endif
}
void handle_state_normal(inet_address endpoint);
/**
* Handle node preparing to leave the ring
*
* @param endpoint node
*/
void handle_state_leaving(inet_address endpoint) {
#if 0
Collection<Token> tokens;
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
// If the node is previously unknown or tokens do not match, update tokenmetadata to
// have this node as 'normal' (it must have been using this token before the
// leave). This way we'll get pending ranges right.
if (!_token_metadata.isMember(endpoint))
{
logger.info("Node {} state jump to leaving", endpoint);
_token_metadata.updateNormalTokens(tokens, endpoint);
}
else if (!_token_metadata.getTokens(endpoint).containsAll(tokens))
{
logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint);
_token_metadata.updateNormalTokens(tokens, endpoint);
}
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
_token_metadata.addLeavingEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void handle_state_leaving(inet_address endpoint);
/**
* Handle node leaving the ring. This will happen when a node is decommissioned
@@ -1326,18 +872,7 @@ private:
* @param endpoint If reason for leaving is decommission, endpoint is the leaving node.
* @param pieces STATE_LEFT,token
*/
void handle_state_left(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert pieces.length >= 2;
Collection<Token> tokens;
tokens = get_tokens_for(endpoint);
if (logger.isDebugEnabled())
logger.debug("Node {} state left, tokens {}", endpoint, tokens);
excise(tokens, endpoint, extractExpireTime(pieces));
#endif
}
void handle_state_left(inet_address endpoint, std::vector<sstring> pieces);
/**
* Handle node moving inside the ring.
@@ -1345,19 +880,7 @@ private:
* @param endpoint moving endpoint address
* @param pieces STATE_MOVING, token
*/
void handle_state_moving(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert pieces.length >= 2;
Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
if (logger.isDebugEnabled())
logger.debug("Node {} state moving, new token {}", endpoint, token);
_token_metadata.addMovingEndpoint(token, endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void handle_state_moving(inet_address endpoint, std::vector<sstring> pieces);
/**
* Handle notification that a node being actively removed from the ring via 'removenode'
@@ -1365,59 +888,9 @@ private:
* @param endpoint node
* @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
*/
void handle_state_removing(inet_address endpoint, std::vector<sstring> pieces) {
#if 0
assert (pieces.length > 0);
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
try
{
drain();
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return;
}
if (_token_metadata.isMember(endpoint))
{
String state = pieces[0];
Collection<Token> removeTokens = _token_metadata.getTokens(endpoint);
if (VersionedValue.REMOVED_TOKEN.equals(state))
{
excise(removeTokens, endpoint, extractExpireTime(pieces));
}
else if (VersionedValue.REMOVING_TOKEN.equals(state))
{
if (logger.isDebugEnabled())
logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint);
// Note that the endpoint is being removed
_token_metadata.addLeavingEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
// find the endpoint coordinating this removal that we need to notify when we're done
String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
UUID hostId = UUID.fromString(coordinator[1]);
// grab any data we are now responsible for and notify responsible node
restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId));
}
}
else // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
{
if (VersionedValue.REMOVED_TOKEN.equals(pieces[0]))
addExpireTimeIfFound(endpoint, extractExpireTime(pieces));
removeEndpoint(endpoint);
}
#endif
}
void handle_state_removing(inet_address endpoint, std::vector<sstring> pieces);
#if 0
private void excise(Collection<Token> tokens, InetAddress endpoint)
{
logger.info("Removing tokens {} for {}", tokens, endpoint);
@@ -1625,61 +1098,7 @@ private:
return changedRanges;
}
#endif
public:
void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override {
ss_debug("SS::on_join endpoint=%s\n", endpoint);
auto tokens = get_tokens_for(endpoint);
for (auto t : tokens) {
ss_debug("t=%s\n", t);
}
for (auto e : ep_state.get_application_state_map()) {
on_change(endpoint, e.first, e.second);
}
// MigrationManager.instance.scheduleSchemaPull(endpoint, epState);
}
void on_alive(gms::inet_address endpoint, gms::endpoint_state state) override
{
ss_debug("SS::on_alive endpoint=%s\n", endpoint);
#if 0
MigrationManager.instance.scheduleSchemaPull(endpoint, state);
if (_token_metadata.isMember(endpoint))
{
HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onUp(endpoint);
}
#endif
}
void on_remove(gms::inet_address endpoint) override
{
#if 0
_token_metadata.removeEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
#endif
}
void on_dead(gms::inet_address endpoint, gms::endpoint_state state) override
{
#if 0
MessagingService.instance().convict(endpoint);
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onDown(endpoint);
#endif
}
void on_restart(gms::inet_address endpoint, gms::endpoint_state state) override
{
#if 0
// If we have restarted before the node was even marked down, we need to reset the connection pool
if (state.isAlive())
onDead(endpoint, state);
#endif
}
#if 0
/** raw load value */
public double getLoad()
{

View File

@@ -20,7 +20,7 @@ int main(int ac, char ** av) {
auto port = server.port();
auto listen = server.listen_address();
print("Messaging server listening on ip %s port %d ...\n", listen, port);
gms::get_failure_detector().start_single().then([config] {
gms::get_failure_detector().start().then([config] {
gms::get_gossiper().start_single().then([config] {
std::set<gms::inet_address> seeds;
for (auto s : config["seed"].as<std::vector<std::string>>()) {

View File

@@ -10,7 +10,7 @@
SEASTAR_TEST_CASE(test_boot_shutdown){
return net::get_messaging_service().start(gms::inet_address("127.0.0.1")).then( [] () {
return gms::get_failure_detector().start_single().then([] {
return gms::get_failure_detector().start().then([] {
return gms::get_gossiper().start_single().then([] {
return gms::get_gossiper().stop().then( [] (){
return gms::get_failure_detector().stop().then( [] (){

View File

@@ -28,6 +28,7 @@ public:
UUID() : most_sig_bits(0), least_sig_bits(0) {}
UUID(int64_t most_sig_bits, int64_t least_sig_bits)
: most_sig_bits(most_sig_bits), least_sig_bits(least_sig_bits) {}
explicit UUID(const sstring& uuid_string);
int64_t get_most_significant_bits() const {
return most_sig_bits;

View File

@@ -7,6 +7,9 @@
#include "net/byteorder.hh"
#include <random>
#include <boost/iterator/function_input_iterator.hpp>
#include <boost/algorithm/string.hpp>
#include <string>
#include "core/sstring.hh"
namespace utils {
@@ -35,4 +38,16 @@ std::ostream& operator<<(std::ostream& out, const UUID& uuid) {
return out << uuid.to_sstring();
}
UUID::UUID(const sstring& uuid) {
auto uuid_string = uuid;
boost::erase_all(uuid_string, "-");
auto size = uuid_string.size() / 2;
assert(size == 16);
sstring most = sstring(uuid_string.begin(), uuid_string.begin() + size);
sstring least = sstring(uuid_string.begin() + size, uuid_string.end());
int base = 16;
this->most_sig_bits = std::stoull(most, nullptr, base);
this->least_sig_bits = std::stoull(least, nullptr, base);
}
}