Merge "remove and decommission node support part 2" from Asias

"More preparatory patches for remove and decommission node support:

- stream hints and reanges
- unbootstrap
- replication finished notification"
This commit is contained in:
Avi Kivity
2015-10-21 12:24:14 +03:00
5 changed files with 228 additions and 205 deletions

View File

@@ -557,5 +557,16 @@ future<> messaging_service::send_truncate(shard_id id, std::chrono::milliseconds
return send_message_timeout<void>(this, net::messaging_verb::TRUNCATE, std::move(id), std::move(timeout), std::move(ks), std::move(cf));
}
// Wrapper for REPLICATION_FINISHED
void messaging_service::register_replication_finished(std::function<future<> (inet_address)>&& func) {
register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func));
}
void messaging_service::unregister_replication_finished() {
_rpc->unregister_handler(messaging_verb::REPLICATION_FINISHED);
}
future<> messaging_service::send_replication_finished(shard_id id, inet_address from) {
// FIXME: getRpcTimeout : conf.request_timeout_in_ms
return send_message_timeout<void>(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from));
}
} // namespace net

View File

@@ -533,6 +533,11 @@ public:
void unregister_truncate();
future<> send_truncate(shard_id, std::chrono::milliseconds, sstring, sstring);
// Wrapper for REPLICATION_FINISHED verb
void register_replication_finished(std::function<future<> (inet_address from)>&& func);
void unregister_replication_finished();
future<> send_replication_finished(shard_id id, inet_address from);
public:
// Return rpc::protocol::client for a shard which is a ip + cpuid pair.
shared_ptr<rpc_protocol_client_wrapper> get_rpc_client(messaging_verb verb, shard_id id);

View File

@@ -2517,6 +2517,11 @@ void storage_proxy::init_messaging_service() {
return sp._db.local().truncate(truncated_at, ksname, cfname);
});
});
ms.register_replication_finished([] (gms::inet_address from) {
get_local_storage_service().confirm_replication(from);
return make_ready_future<>();
});
}
void storage_proxy::uninit_messaging_service() {
@@ -2529,6 +2534,7 @@ void storage_proxy::uninit_messaging_service() {
ms.unregister_read_mutation_data();
ms.unregister_read_digest();
ms.unregister_truncate();
ms.unregister_replication_finished();
}
// Merges reconcilable_result:s from different shards into one

View File

@@ -424,7 +424,6 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) {
}
_token_metadata.add_bootstrap_tokens(tokens, endpoint);
// FIXME
get_local_pending_range_calculator_service().update().get();
auto& gossiper = gms::get_local_gossiper();
@@ -454,10 +453,10 @@ void storage_service::handle_state_normal(inet_address endpoint) {
if (gossiper.uses_host_id(endpoint)) {
auto host_id = gossiper.get_host_id(endpoint);
auto existing = _token_metadata.get_endpoint_for_host_id(host_id);
// if (DatabaseDescriptor.isReplacing() &&
// Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null &&
// (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) {
if (false) {
if (is_replacing() &&
get_replace_address() &&
gossiper.get_endpoint_state_for_endpoint(get_replace_address().value()) &&
(host_id == gossiper.get_host_id(get_replace_address().value()))) {
logger.warn("Not updating token metadata for {} because I am replacing it", endpoint);
} else {
if (existing && *existing != endpoint) {
@@ -584,9 +583,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) {
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
#if 0
_token_metadata.addLeavingEndpoint(endpoint);
#endif
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
}
@@ -630,18 +627,24 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vector<s
if (sstring(gms::versioned_value::REMOVED_TOKEN) == state) {
// excise(removeTokens, endpoint, extractExpireTime(pieces));
} else if (sstring(gms::versioned_value::REMOVING_TOKEN) == state) {
#if 0
auto& gossiper = gms::get_local_gossiper();
logger.debug("Tokens {} removed manually (endpoint was {})", remove_tokens, endpoint);
// Note that the endpoint is being removed
_token_metadata.addLeavingEndpoint(endpoint);
PendingRangeCalculatorService.instance.update();
_token_metadata.add_leaving_endpoint(endpoint);
get_local_pending_range_calculator_service().update().get();
// find the endpoint coordinating this removal that we need to notify when we're done
String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
UUID hostId = UUID.fromString(coordinator[1]);
auto state = gossiper.get_endpoint_state_for_endpoint(endpoint);
assert(state);
auto value = state->get_application_state(application_state::REMOVAL_COORDINATOR);
assert(value);
std::vector<sstring> coordinator;
boost::split(coordinator, value->value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR)));
assert(coordinator.size() == 2);
UUID host_id(coordinator[1]);
// grab any data we are now responsible for and notify responsible node
restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId));
#endif
auto ep = _token_metadata.get_endpoint_for_host_id(host_id);
assert(ep);
restore_replica_count(endpoint, ep.value()).get();
}
} else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it
if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) {
@@ -1458,15 +1461,15 @@ future<> storage_service::decommission() {
set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout), true);
sleep(std::chrono::milliseconds(timeout)).get();
unbootstrap().finally([this] {
// FIXME: proper shutdown
// shutdownClientServers();
gms::get_local_gossiper().stop();
// MessagingService.instance().shutdown();
// StageManager.shutdownNow();
set_mode(mode::DECOMMISSIONED, true);
// let op be responsible for killing the process
}).get();
unbootstrap();
// FIXME: proper shutdown
// shutdownClientServers();
gms::get_local_gossiper().stop();
// MessagingService.instance().shutdown();
// StageManager.shutdownNow();
set_mode(mode::DECOMMISSIONED, true);
// let op be responsible for killing the process
});
}
@@ -1533,7 +1536,7 @@ future<> storage_service::remove_node(sstring host_id_string) {
gossiper.advertise_removing(endpoint, host_id, local_host_id);
// kick off streaming commands
// restoreReplicaCount(endpoint, myAddress);
restore_replica_count(endpoint, my_address).get();
// wait for ReplicationFinishedVerbHandler to signal we're done
while (!_replicating_nodes.empty()) {
@@ -1766,27 +1769,29 @@ std::unordered_multimap<range<token>, inet_address> storage_service::get_changed
return changed_ranges;
}
future<> storage_service::unbootstrap() {
return make_ready_future<>();
#if 0
Map<String, Multimap<Range<Token>, InetAddress>> rangesToStream = new HashMap<>();
// Runs inside seastar::async context
void storage_service::unbootstrap() {
std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream;
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
Multimap<Range<Token>, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress());
if (logger.isDebugEnabled())
logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ","));
rangesToStream.put(keyspaceName, rangesMM);
auto non_system_keyspaces = _db.local().get_non_system_keyspaces();
for (const auto& keyspace_name : non_system_keyspaces) {
auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address());
if (logger.is_enabled(logging::log_level::debug)) {
std::vector<range<token>> ranges;
for (auto& x : ranges_mm) {
ranges.push_back(x.first);
}
logger.debug("Ranges needing transfer are [{}]", ranges);
}
ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm));
}
setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true);
set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true);
// Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint.
Future<?> batchlogReplay = BatchlogManager.instance.startBatchlogReplay();
Future<StreamState> streamSuccess = streamRanges(rangesToStream);
// FIXME: Future<?> batchlogReplay = BatchlogManager.instance.startBatchlogReplay();
auto stream_success = stream_ranges(ranges_to_stream);
#if 0
// Wait for batch log to complete before streaming hints.
logger.debug("waiting for batch log processing.");
try
@@ -1797,26 +1802,23 @@ future<> storage_service::unbootstrap() {
{
throw new RuntimeException(e);
}
#endif
setMode(Mode.LEAVING, "streaming hints to other nodes", true);
set_mode(mode::LEAVING, "streaming hints to other nodes", true);
Future<StreamState> hintsSuccess = streamHints();
auto hints_success = stream_hints();
// wait for the transfer runnables to signal the latch.
logger.debug("waiting for stream acks.");
try
{
streamSuccess.get();
hintsSuccess.get();
}
catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException(e);
try {
auto stream_state = stream_success.get0();
auto hints_state = hints_success.get0();
} catch (...) {
logger.warn("unbootstrap fails to stream : {}", std::current_exception());
throw;
}
logger.debug("stream acks all received.");
leaveRing();
onFinish.run();
#endif
leave_ring();
}
future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) {
@@ -1857,11 +1859,11 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr
return sp.execute().then_wrapped([this, notify_endpoint] (auto&& f) {
try {
auto state = f.get0();
this->send_replication_notification(notify_endpoint);
return this->send_replication_notification(notify_endpoint);
} catch (...) {
logger.warn("Streaming to restore replica count failed: {}", std::current_exception());
// We still want to send the notification
this->send_replication_notification(notify_endpoint);
return this->send_replication_notification(notify_endpoint);
}
return make_ready_future<>();
});
@@ -1875,12 +1877,12 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
_token_metadata.remove_endpoint(endpoint);
_token_metadata.remove_bootstrap_tokens(tokens);
// FIXME: IEndpointLifecycleSubscriber
#if 0
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) {
subscriber.onLeaveCluster(endpoint);
}
#endif
get_storage_service().invoke_on_all([endpoint] (auto&& ss) {
for (auto&& subscriber : ss._lifecycle_subscribers) {
subscriber->on_leave_cluster(endpoint);
}
}).get();
get_local_pending_range_calculator_service().update().get();
}
@@ -1889,4 +1891,132 @@ void storage_service::excise(std::unordered_set<token> tokens, inet_address endp
excise(tokens, endpoint);
}
future<> storage_service::send_replication_notification(inet_address remote) {
// notify the remote token
auto done = make_shared<bool>(false);
auto local = get_broadcast_address();
logger.debug("Notifying {} of replication completion", remote);
return do_until(
[done, remote] {
return *done || !gms::get_local_failure_detector().is_alive(remote);
},
[done, remote, local] {
auto& ms = net::get_local_messaging_service();
net::shard_id id{remote, 0};
return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) {
try {
f.get();
*done = true;
} catch (...) {
logger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception());
}
});
}
);
}
void storage_service::confirm_replication(inet_address node) {
// replicatingNodes can be empty in the case where this node used to be a removal coordinator,
// but restarted before all 'replication finished' messages arrived. In that case, we'll
// still go ahead and acknowledge it.
if (!_replicating_nodes.empty()) {
_replicating_nodes.erase(node);
} else {
logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node);
}
}
// Runs inside seastar::async context
void storage_service::leave_ring() {
db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get();
_token_metadata.remove_endpoint(get_broadcast_address());
get_local_pending_range_calculator_service().update().get();
auto& gossiper = gms::get_local_gossiper();
auto expire_time = gossiper.compute_expire_time().time_since_epoch().count();
gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time));
auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL);
logger.info("Announcing that I have left the ring for {}ms", delay.count());
sleep(delay).get();
}
future<streaming::stream_state>
storage_service::stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace) {
using stream_plan = streaming::stream_plan;
// First, we build a list of ranges to stream to each host, per table
std::unordered_map<sstring, std::unordered_map<inet_address, std::vector<range<token>>>> sessions_to_stream_by_keyspace;
for (auto& entry : ranges_to_stream_by_keyspace) {
const auto& keyspace = entry.first;
auto& ranges_with_endpoints = entry.second;
if (ranges_with_endpoints.empty()) {
continue;
}
std::unordered_map<inet_address, std::vector<range<token>>> ranges_per_endpoint;
for (auto& end_point_entry : ranges_with_endpoints) {
range<token> r = end_point_entry.first;
inet_address endpoint = end_point_entry.second;
ranges_per_endpoint[endpoint].emplace_back(r);
}
sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint));
}
stream_plan sp("Unbootstrap", true);
for (auto& entry : sessions_to_stream_by_keyspace) {
const auto& keyspace_name = entry.first;
// TODO: we can move to avoid copy of std::vector
auto& ranges_per_endpoint = entry.second;
for (auto& ranges_entry : ranges_per_endpoint) {
auto& ranges = ranges_entry.second;
auto new_endpoint = ranges_entry.first;
auto preferred = new_endpoint; // FIXME: SystemKeyspace.getPreferredIP(newEndpoint);
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
sp.transfer_ranges(new_endpoint, preferred, keyspace_name, ranges);
}
}
return sp.execute();
}
future<streaming::stream_state> storage_service::stream_hints() {
// FIXME: flush hits column family
#if 0
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
FBUtilities.waitOnFuture(hintsCF.forceFlush());
#endif
// gather all live nodes in the cluster that aren't also leaving
auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints();
auto beg = candidates.begin();
auto end = candidates.end();
auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) {
return ep == br || !gms::get_local_failure_detector().is_alive(ep);
};
candidates.erase(std::remove_if(beg, end, remove_fn), end);
if (candidates.empty()) {
logger.warn("Unable to stream hints since no live endpoints seen");
throw std::runtime_error("Unable to stream hints since no live endpoints seen");
} else {
// stream to the closest peer as chosen by the snitch
auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr();
snitch->sort_by_proximity(get_broadcast_address(), candidates);
auto hints_destination_host = candidates.front();
auto preferred = hints_destination_host; // FIXME: SystemKeyspace.getPreferredIP(hints_destination_host);
// stream all hints -- range list will be a singleton of "the entire ring"
auto t = dht::global_partitioner().get_minimum_token();
std::vector<range<token>> ranges = {range<token>(t)};
streaming::stream_plan sp("Hints", true);
std::vector<sstring> column_families = { db::system_keyspace::HINTS };
auto keyspace = db::system_keyspace::NAME;
sp.transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families);
return sp.execute();
}
}
} // namespace service

View File

@@ -54,6 +54,7 @@
#include "utils/fb_utilities.hh"
#include "database.hh"
#include <seastar/core/distributed.hh>
#include "streaming/stream_state.hh"
namespace service {
@@ -807,6 +808,9 @@ private:
#endif
}
public:
void confirm_replication(inet_address node);
private:
/**
@@ -814,30 +818,8 @@ private:
*
* @param remote node to send notification to
*/
void send_replication_notification(inet_address remote) {
#if 0
// notify the remote token
MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED);
IFailureDetector failureDetector = FailureDetector.instance;
if (logger.isDebugEnabled())
logger.debug("Notifying {} of replication completion\n", remote);
while (failureDetector.isAlive(remote))
{
AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote);
try
{
iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
return; // done
}
catch(TimeoutException e)
{
// try again
}
}
#endif
}
future<> send_replication_notification(inet_address remote);
private:
/**
* Called when an endpoint is removed from the ring. This function checks
* whether this node becomes responsible for new ranges as a
@@ -1910,64 +1892,12 @@ public:
future<> decommission();
#if 0
private void leaveRing()
{
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP);
_token_metadata.removeEndpoint(FBUtilities.getBroadcastAddress());
PendingRangeCalculatorService.instance.update();
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
logger.info("Announcing that I have left the ring for {}ms", delay);
Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
}
#endif
private:
future<> unbootstrap();
void leave_ring();
void unbootstrap();
future<streaming::stream_state> stream_hints();
#if 0
private Future<StreamState> streamHints()
{
// StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well)
ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
FBUtilities.waitOnFuture(hintsCF.forceFlush());
// gather all live nodes in the cluster that aren't also leaving
List<InetAddress> candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints());
candidates.remove(FBUtilities.getBroadcastAddress());
for (Iterator<InetAddress> iter = candidates.iterator(); iter.hasNext(); )
{
InetAddress address = iter.next();
if (!FailureDetector.instance.isAlive(address))
iter.remove();
}
if (candidates.isEmpty())
{
logger.warn("Unable to stream hints since no live endpoints seen");
return Futures.immediateFuture(null);
}
else
{
// stream to the closest peer as chosen by the snitch
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates);
InetAddress hintsDestinationHost = candidates.get(0);
InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost);
// stream all hints -- range list will be a singleton of "the entire ring"
Token token = StorageService.getPartitioner().getMinimumToken();
List<Range<Token>> ranges = Collections.singletonList(new Range<>(token, token));
return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
preferred,
SystemKeyspace.NAME,
ranges,
SystemKeyspace.HINTS)
.execute();
}
}
public void move(String newToken) throws IOException
{
try
@@ -2240,22 +2170,6 @@ public:
*/
future<> remove_node(sstring host_id_string);
#if 0
public void confirmReplication(InetAddress node)
{
// replicatingNodes can be empty in the case where this node used to be a removal coordinator,
// but restarted before all 'replication finished' messages arrived. In that case, we'll
// still go ahead and acknowledge it.
if (!replicatingNodes.isEmpty())
{
replicatingNodes.remove(node);
}
else
{
logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node);
}
}
#endif
future<sstring> get_operation_mode();
future<bool> is_starting();
@@ -2437,62 +2351,18 @@ public:
if (oldSnitch instanceof DynamicEndpointSnitch)
((DynamicEndpointSnitch)oldSnitch).unregisterMBean();
}
#endif
private:
/**
* Seed data to the endpoints that will be responsible for it at the future
*
* @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each
* @return async Future for whether stream was success
*/
private Future<StreamState> streamRanges(Map<String, Multimap<Range<Token>, InetAddress>> rangesToStreamByKeyspace)
{
// First, we build a list of ranges to stream to each host, per table
Map<String, Map<InetAddress, List<Range<Token>>>> sessionsToStreamByKeyspace = new HashMap<>();
for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : rangesToStreamByKeyspace.entrySet())
{
String keyspace = entry.getKey();
Multimap<Range<Token>, InetAddress> rangesWithEndpoints = entry.getValue();
if (rangesWithEndpoints.isEmpty())
continue;
Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = new HashMap<>();
for (Map.Entry<Range<Token>, InetAddress> endPointEntry : rangesWithEndpoints.entries())
{
Range<Token> range = endPointEntry.getKey();
InetAddress endpoint = endPointEntry.getValue();
List<Range<Token>> curRanges = rangesPerEndpoint.get(endpoint);
if (curRanges == null)
{
curRanges = new LinkedList<>();
rangesPerEndpoint.put(endpoint, curRanges);
}
curRanges.add(range);
}
sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint);
}
StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry : sessionsToStreamByKeyspace.entrySet())
{
String keyspaceName = entry.getKey();
Map<InetAddress, List<Range<Token>>> rangesPerEndpoint = entry.getValue();
for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry : rangesPerEndpoint.entrySet())
{
List<Range<Token>> ranges = rangesEntry.getValue();
InetAddress newEndpoint = rangesEntry.getKey();
InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint);
// TODO each call to transferRanges re-flushes, this is potentially a lot of waste
streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges);
}
}
return streamPlan.execute();
}
future<streaming::stream_state> stream_ranges(std::unordered_map<sstring, std::unordered_multimap<range<token>, inet_address>> ranges_to_stream_by_keyspace);
#if 0
/**
* Calculate pair of ranges to stream/fetch for given two range collections
* (current ranges for keyspace and ranges after move to new token)
@@ -2600,6 +2470,7 @@ public:
return loader.stream();
}
#endif
public:
int32_t get_exception_count();
#if 0
public void rescheduleFailedDeletions()