From 2f86feb581d35b1fa01704ac16172863cef0531f Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 08:51:20 +0800 Subject: [PATCH 01/15] storage_service: Move send_replication_notification to source file --- service/storage_service.cc | 23 +++++++++++++++++++++++ service/storage_service.hh | 25 +------------------------ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index b078bd4795..0ab51f8151 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1889,4 +1889,27 @@ void storage_service::excise(std::unordered_set tokens, inet_address endp excise(tokens, endpoint); } +void storage_service::send_replication_notification(inet_address remote) { +#if 0 + // notify the remote token + MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED); + IFailureDetector failureDetector = FailureDetector.instance; + if (logger.isDebugEnabled()) + logger.debug("Notifying {} of replication completion\n", remote); + while (failureDetector.isAlive(remote)) + { + AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote); + try + { + iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); + return; // done + } + catch(TimeoutException e) + { + // try again + } + } +#endif +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3e07713c82..7af59f5a9d 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -814,30 +814,7 @@ private: * * @param remote node to send notification to */ - void send_replication_notification(inet_address remote) { -#if 0 - // notify the remote token - MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED); - IFailureDetector failureDetector = FailureDetector.instance; - if (logger.isDebugEnabled()) - logger.debug("Notifying {} of replication completion\n", remote); - while (failureDetector.isAlive(remote)) - { - AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote); - try - { - iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - return; // done - } - catch(TimeoutException e) - { - // try again - } - } -#endif - } - -private: + void send_replication_notification(inet_address remote); /** * Called when an endpoint is removed from the ring. This function checks * whether this node becomes responsible for new ranges as a From d1eaccd234d937b677bbdf2733ebbe81cbc404ef Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 09:18:00 +0800 Subject: [PATCH 02/15] storage_service: Implement leave_ring Needed by unbootstrap. --- service/storage_service.cc | 14 ++++++++++++++ service/storage_service.hh | 15 +-------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 0ab51f8151..e40fc92eec 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1912,4 +1912,18 @@ void storage_service::send_replication_notification(inet_address remote) { #endif } +// Runs inside seastar::async context +void storage_service::leave_ring() { + db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get(); + _token_metadata.remove_endpoint(get_broadcast_address()); + get_local_pending_range_calculator_service().update().get(); + + auto& gossiper = gms::get_local_gossiper(); + auto expire_time = gossiper.compute_expire_time().time_since_epoch().count(); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)); + auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL); + logger.info("Announcing that I have left the ring for {}ms", delay.count()); + sleep(delay).get(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 7af59f5a9d..9176f102fd 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1887,23 +1887,10 @@ public: future<> decommission(); -#if 0 - private void leaveRing() - { - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP); - _token_metadata.removeEndpoint(FBUtilities.getBroadcastAddress()); - PendingRangeCalculatorService.instance.update(); - - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime())); - int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2); - logger.info("Announcing that I have left the ring for {}ms", delay); - Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); - } -#endif private: + void leave_ring(); future<> unbootstrap(); #if 0 - private Future streamHints() { // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) From 3f5e9baa17d1be10883c2f170132baca47d9942e Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 09:45:57 +0800 Subject: [PATCH 03/15] storage_service: Implement stream_ranges Needed by unbootstrap. --- service/storage_service.cc | 39 +++++++++++++++++++++++++++ service/storage_service.hh | 54 +++++--------------------------------- 2 files changed, 45 insertions(+), 48 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index e40fc92eec..57523a5811 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1926,4 +1926,43 @@ void storage_service::leave_ring() { sleep(delay).get(); } +future +storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { + using stream_plan = streaming::stream_plan; + // First, we build a list of ranges to stream to each host, per table + std::unordered_map>>> sessions_to_stream_by_keyspace; + for (auto& entry : ranges_to_stream_by_keyspace) { + const auto& keyspace = entry.first; + auto& ranges_with_endpoints = entry.second; + + if (ranges_with_endpoints.empty()) { + continue; + } + + std::unordered_map>> ranges_per_endpoint; + for (auto& end_point_entry : ranges_with_endpoints) { + range r = end_point_entry.first; + inet_address endpoint = end_point_entry.second; + ranges_per_endpoint[endpoint].emplace_back(r); + } + sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint)); + } + stream_plan sp("Unbootstrap", true); + for (auto& entry : sessions_to_stream_by_keyspace) { + const auto& keyspace_name = entry.first; + // TODO: we can move to avoid copy of std::vector + auto& ranges_per_endpoint = entry.second; + + for (auto& ranges_entry : ranges_per_endpoint) { + auto& ranges = ranges_entry.second; + auto new_endpoint = ranges_entry.first; + auto preferred = new_endpoint; // FIXME: SystemKeyspace.getPreferredIP(newEndpoint); + + // TODO each call to transferRanges re-flushes, this is potentially a lot of waste + sp.transfer_ranges(new_endpoint, preferred, keyspace_name, ranges); + } + } + return sp.execute(); +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 9176f102fd..d183d1c5b2 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -54,6 +54,7 @@ #include "utils/fb_utilities.hh" #include "database.hh" #include +#include "streaming/stream_state.hh" namespace service { @@ -2401,62 +2402,18 @@ public: if (oldSnitch instanceof DynamicEndpointSnitch) ((DynamicEndpointSnitch)oldSnitch).unregisterMBean(); } +#endif +private: /** * Seed data to the endpoints that will be responsible for it at the future * * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - private Future streamRanges(Map, InetAddress>> rangesToStreamByKeyspace) - { - // First, we build a list of ranges to stream to each host, per table - Map>>> sessionsToStreamByKeyspace = new HashMap<>(); - for (Map.Entry, InetAddress>> entry : rangesToStreamByKeyspace.entrySet()) - { - String keyspace = entry.getKey(); - Multimap, InetAddress> rangesWithEndpoints = entry.getValue(); - - if (rangesWithEndpoints.isEmpty()) - continue; - - Map>> rangesPerEndpoint = new HashMap<>(); - for (Map.Entry, InetAddress> endPointEntry : rangesWithEndpoints.entries()) - { - Range range = endPointEntry.getKey(); - InetAddress endpoint = endPointEntry.getValue(); - - List> curRanges = rangesPerEndpoint.get(endpoint); - if (curRanges == null) - { - curRanges = new LinkedList<>(); - rangesPerEndpoint.put(endpoint, curRanges); - } - curRanges.add(range); - } - - sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint); - } - - StreamPlan streamPlan = new StreamPlan("Unbootstrap"); - for (Map.Entry>>> entry : sessionsToStreamByKeyspace.entrySet()) - { - String keyspaceName = entry.getKey(); - Map>> rangesPerEndpoint = entry.getValue(); - - for (Map.Entry>> rangesEntry : rangesPerEndpoint.entrySet()) - { - List> ranges = rangesEntry.getValue(); - InetAddress newEndpoint = rangesEntry.getKey(); - InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); - - // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); - } - } - return streamPlan.execute(); - } + future stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); +#if 0 /** * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for keyspace and ranges after move to new token) @@ -2564,6 +2521,7 @@ public: return loader.stream(); } #endif +public: int32_t get_exception_count(); #if 0 public void rescheduleFailedDeletions() From 8a9374b33127c3222fac267e3694ebcee5c3fb7d Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 10:29:25 +0800 Subject: [PATCH 04/15] storage_service: Implement stream_hints Needed by unbootstrap. --- service/storage_service.cc | 40 +++++++++++++++++++++++++++++++++++++ service/storage_service.hh | 41 +------------------------------------- 2 files changed, 41 insertions(+), 40 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 57523a5811..923f2c1acf 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1965,4 +1965,44 @@ storage_service::stream_ranges(std::unordered_map storage_service::stream_hints() { + // FIXME: flush hits column family +#if 0 + // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) + ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); + FBUtilities.waitOnFuture(hintsCF.forceFlush()); +#endif + + // gather all live nodes in the cluster that aren't also leaving + auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints(); + auto beg = candidates.begin(); + auto end = candidates.end(); + auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) { + return ep == br || !gms::get_local_failure_detector().is_alive(ep); + }; + candidates.erase(std::remove_if(beg, end, remove_fn), end); + + if (candidates.empty()) { + logger.warn("Unable to stream hints since no live endpoints seen"); + throw std::runtime_error("Unable to stream hints since no live endpoints seen"); + } else { + // stream to the closest peer as chosen by the snitch + auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); + + snitch->sort_by_proximity(get_broadcast_address(), candidates); + auto hints_destination_host = candidates.front(); + auto preferred = hints_destination_host; // FIXME: SystemKeyspace.getPreferredIP(hints_destination_host); + + // stream all hints -- range list will be a singleton of "the entire ring" + auto t = dht::global_partitioner().get_minimum_token(); + std::vector> ranges = {range(t)}; + + streaming::stream_plan sp("Hints", true); + std::vector column_families = { db::system_keyspace::HINTS }; + auto keyspace = db::system_keyspace::NAME; + sp.transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families); + return sp.execute(); + } +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index d183d1c5b2..e92159e16e 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1891,47 +1891,8 @@ public: private: void leave_ring(); future<> unbootstrap(); + future stream_hints(); #if 0 - private Future streamHints() - { - // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) - ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); - FBUtilities.waitOnFuture(hintsCF.forceFlush()); - - // gather all live nodes in the cluster that aren't also leaving - List candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); - candidates.remove(FBUtilities.getBroadcastAddress()); - for (Iterator iter = candidates.iterator(); iter.hasNext(); ) - { - InetAddress address = iter.next(); - if (!FailureDetector.instance.isAlive(address)) - iter.remove(); - } - - if (candidates.isEmpty()) - { - logger.warn("Unable to stream hints since no live endpoints seen"); - return Futures.immediateFuture(null); - } - else - { - // stream to the closest peer as chosen by the snitch - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); - InetAddress hintsDestinationHost = candidates.get(0); - InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost); - - // stream all hints -- range list will be a singleton of "the entire ring" - Token token = StorageService.getPartitioner().getMinimumToken(); - List> ranges = Collections.singletonList(new Range<>(token, token)); - - return new StreamPlan("Hints").transferRanges(hintsDestinationHost, - preferred, - SystemKeyspace.NAME, - ranges, - SystemKeyspace.HINTS) - .execute(); - } - } public void move(String newToken) throws IOException { From 6f8f4816a525a7d18ffdce69b419ca4b529b6a14 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 10:57:09 +0800 Subject: [PATCH 05/15] storage_service: Implement unbootstrap All the missing functions for unbootstrap are ready, we can implement unbootstrap now. --- service/storage_service.cc | 75 +++++++++++++++++++------------------- service/storage_service.hh | 2 +- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 923f2c1acf..f000fff4be 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1458,15 +1458,15 @@ future<> storage_service::decommission() { set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout), true); sleep(std::chrono::milliseconds(timeout)).get(); - unbootstrap().finally([this] { - // FIXME: proper shutdown - // shutdownClientServers(); - gms::get_local_gossiper().stop(); - // MessagingService.instance().shutdown(); - // StageManager.shutdownNow(); - set_mode(mode::DECOMMISSIONED, true); - // let op be responsible for killing the process - }).get(); + unbootstrap(); + + // FIXME: proper shutdown + // shutdownClientServers(); + gms::get_local_gossiper().stop(); + // MessagingService.instance().shutdown(); + // StageManager.shutdownNow(); + set_mode(mode::DECOMMISSIONED, true); + // let op be responsible for killing the process }); } @@ -1766,27 +1766,29 @@ std::unordered_multimap, inet_address> storage_service::get_changed return changed_ranges; } -future<> storage_service::unbootstrap() { - return make_ready_future<>(); -#if 0 - Map, InetAddress>> rangesToStream = new HashMap<>(); +// Runs inside seastar::async context +void storage_service::unbootstrap() { + std::unordered_map, inet_address>> ranges_to_stream; - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - { - Multimap, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress()); - - if (logger.isDebugEnabled()) - logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ",")); - - rangesToStream.put(keyspaceName, rangesMM); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); + if (logger.is_enabled(logging::log_level::debug)) { + std::vector> ranges; + for (auto& x : ranges_mm) { + ranges.push_back(x.first); + } + logger.debug("Ranges needing transfer are [{}]", ranges); + } + ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); } - setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true); + set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. - Future batchlogReplay = BatchlogManager.instance.startBatchlogReplay(); - Future streamSuccess = streamRanges(rangesToStream); - + // FIXME: Future batchlogReplay = BatchlogManager.instance.startBatchlogReplay(); + auto stream_success = stream_ranges(ranges_to_stream); +#if 0 // Wait for batch log to complete before streaming hints. logger.debug("waiting for batch log processing."); try @@ -1797,26 +1799,23 @@ future<> storage_service::unbootstrap() { { throw new RuntimeException(e); } +#endif - setMode(Mode.LEAVING, "streaming hints to other nodes", true); + set_mode(mode::LEAVING, "streaming hints to other nodes", true); - Future hintsSuccess = streamHints(); + auto hints_success = stream_hints(); // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream acks."); - try - { - streamSuccess.get(); - hintsSuccess.get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException(e); + try { + auto stream_state = stream_success.get0(); + auto hints_state = hints_success.get0(); + } catch (...) { + logger.warn("unbootstrap fails to stream : {}", std::current_exception()); + throw; } logger.debug("stream acks all received."); - leaveRing(); - onFinish.run(); -#endif + leave_ring(); } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { diff --git a/service/storage_service.hh b/service/storage_service.hh index e92159e16e..4918905660 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1890,7 +1890,7 @@ public: private: void leave_ring(); - future<> unbootstrap(); + void unbootstrap(); future stream_hints(); #if 0 From 434a9e211e6225ac04f788a9da92308d243fa11e Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:49:12 +0800 Subject: [PATCH 06/15] storage_service: Kill one FIXME in handle_state_bootstrap It is already fixed. --- service/storage_service.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index f000fff4be..8ce0551308 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -424,7 +424,6 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) { } _token_metadata.add_bootstrap_tokens(tokens, endpoint); - // FIXME get_local_pending_range_calculator_service().update().get(); auto& gossiper = gms::get_local_gossiper(); From d3120b3c2f490fd82fcf94d922dffa71a554ef86 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:50:08 +0800 Subject: [PATCH 07/15] storage_service: Complete is_replacing logic in handle_state_normal --- service/storage_service.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8ce0551308..8aac3fb48d 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -453,10 +453,10 @@ void storage_service::handle_state_normal(inet_address endpoint) { if (gossiper.uses_host_id(endpoint)) { auto host_id = gossiper.get_host_id(endpoint); auto existing = _token_metadata.get_endpoint_for_host_id(host_id); - // if (DatabaseDescriptor.isReplacing() && - // Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && - // (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) { - if (false) { + if (is_replacing() && + get_replace_address() && + gossiper.get_endpoint_state_for_endpoint(get_replace_address().value()) && + (host_id == gossiper.get_host_id(get_replace_address().value()))) { logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); } else { if (existing && *existing != endpoint) { From 7d656fe1275e728db0da65ba8f05f2eb507553c7 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:51:00 +0800 Subject: [PATCH 08/15] storage_service: Enable add_leaving_endpoint in handle_state_leaving --- service/storage_service.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 8aac3fb48d..b57903f952 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -583,9 +583,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) { // at this point the endpoint is certainly a member with this token, so let's proceed // normally -#if 0 - _token_metadata.addLeavingEndpoint(endpoint); -#endif + _token_metadata.add_leaving_endpoint(endpoint); get_local_pending_range_calculator_service().update().get(); } From 5b170d1ffe4233c98d0cb0b01452889b3df892a4 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:51:49 +0800 Subject: [PATCH 09/15] storage_service: Complete handle_state_removing Implement #if 0'ed code. --- service/storage_service.cc | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index b57903f952..146333ca71 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -627,18 +627,24 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vectorget_application_state(application_state::REMOVAL_COORDINATOR); + assert(value); + std::vector coordinator; + boost::split(coordinator, value->value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + assert(coordinator.size() == 2); + UUID host_id(coordinator[1]); // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId)); -#endif + auto ep = _token_metadata.get_endpoint_for_host_id(host_id); + assert(ep); + restore_replica_count(endpoint, ep.value()).get(); } } else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) { From 4f57f0cdae53c8acfa66729356d1426d1b2cae93 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:52:54 +0800 Subject: [PATCH 10/15] storage_service: Enable restore_replica_count in remove_node --- service/storage_service.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 146333ca71..d1bd7d54e3 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1536,7 +1536,7 @@ future<> storage_service::remove_node(sstring host_id_string) { gossiper.advertise_removing(endpoint, host_id, local_host_id); // kick off streaming commands - // restoreReplicaCount(endpoint, myAddress); + restore_replica_count(endpoint, my_address).get(); // wait for ReplicationFinishedVerbHandler to signal we're done while (!_replicating_nodes.empty()) { From d903bd0dba13d6d85afeb0415637d585824a8c25 Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 14:53:20 +0800 Subject: [PATCH 11/15] storage_service: Implement on_leave_cluster in excise --- service/storage_service.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index d1bd7d54e3..063816466b 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1877,12 +1877,12 @@ void storage_service::excise(std::unordered_set tokens, inet_address endp _token_metadata.remove_endpoint(endpoint); _token_metadata.remove_bootstrap_tokens(tokens); - // FIXME: IEndpointLifecycleSubscriber -#if 0 - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) { - subscriber.onLeaveCluster(endpoint); - } -#endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_leave_cluster(endpoint); + } + }).get(); + get_local_pending_range_calculator_service().update().get(); } From 1965e8751bc6f2e55fe9424fe28208cbf9b38b5b Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 15:23:22 +0800 Subject: [PATCH 12/15] messaging_service: Add REPLICATION_FINISHED verb It is used to send replication finished message by storage_service when removing a node from a cluster. --- message/messaging_service.cc | 11 +++++++++++ message/messaging_service.hh | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 8057ea3687..ee2f709c56 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -557,5 +557,16 @@ future<> messaging_service::send_truncate(shard_id id, std::chrono::milliseconds return send_message_timeout(this, net::messaging_verb::TRUNCATE, std::move(id), std::move(timeout), std::move(ks), std::move(cf)); } +// Wrapper for REPLICATION_FINISHED +void messaging_service::register_replication_finished(std::function (inet_address)>&& func) { + register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func)); +} +void messaging_service::unregister_replication_finished() { + _rpc->unregister_handler(messaging_verb::REPLICATION_FINISHED); +} +future<> messaging_service::send_replication_finished(shard_id id, inet_address from) { + // FIXME: getRpcTimeout : conf.request_timeout_in_ms + return send_message_timeout(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from)); +} } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5e9e1254fe..f5c4b88bb8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -533,6 +533,11 @@ public: void unregister_truncate(); future<> send_truncate(shard_id, std::chrono::milliseconds, sstring, sstring); + // Wrapper for REPLICATION_FINISHED verb + void register_replication_finished(std::function (inet_address from)>&& func); + void unregister_replication_finished(); + future<> send_replication_finished(shard_id id, inet_address from); + public: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. shared_ptr get_rpc_client(messaging_verb verb, shard_id id); From ffce7a7af8aed44f5b9c024a277ec74e9bcb6dae Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 15:35:54 +0800 Subject: [PATCH 13/15] storage_service: Implement confirm_replication --- service/storage_service.cc | 11 +++++++++++ service/storage_service.hh | 20 ++++---------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 063816466b..b4171e2adb 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1914,6 +1914,17 @@ void storage_service::send_replication_notification(inet_address remote) { #endif } +void storage_service::confirm_replication(inet_address node) { + // replicatingNodes can be empty in the case where this node used to be a removal coordinator, + // but restarted before all 'replication finished' messages arrived. In that case, we'll + // still go ahead and acknowledge it. + if (!_replicating_nodes.empty()) { + _replicating_nodes.erase(node); + } else { + logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node); + } +} + // Runs inside seastar::async context void storage_service::leave_ring() { db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get(); diff --git a/service/storage_service.hh b/service/storage_service.hh index 4918905660..b17e92acb4 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -808,6 +808,9 @@ private: #endif } +public: + void confirm_replication(inet_address node); + private: /** @@ -816,6 +819,7 @@ private: * @param remote node to send notification to */ void send_replication_notification(inet_address remote); + /** * Called when an endpoint is removed from the ring. This function checks * whether this node becomes responsible for new ranges as a @@ -2166,22 +2170,6 @@ public: */ future<> remove_node(sstring host_id_string); -#if 0 - public void confirmReplication(InetAddress node) - { - // replicatingNodes can be empty in the case where this node used to be a removal coordinator, - // but restarted before all 'replication finished' messages arrived. In that case, we'll - // still go ahead and acknowledge it. - if (!replicatingNodes.isEmpty()) - { - replicatingNodes.remove(node); - } - else - { - logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node); - } - } -#endif future get_operation_mode(); future is_starting(); From 56e55cd2728ef13a23e423eeb513ca5c7aadd09e Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 15:36:52 +0800 Subject: [PATCH 14/15] storage_proxy: Register replication_finished verb handler --- service/storage_proxy.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9e76aff56a..7cb3b34abd 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2517,6 +2517,11 @@ void storage_proxy::init_messaging_service() { return sp._db.local().truncate(truncated_at, ksname, cfname); }); }); + + ms.register_replication_finished([] (gms::inet_address from) { + get_local_storage_service().confirm_replication(from); + return make_ready_future<>(); + }); } void storage_proxy::uninit_messaging_service() { @@ -2529,6 +2534,7 @@ void storage_proxy::uninit_messaging_service() { ms.unregister_read_mutation_data(); ms.unregister_read_digest(); ms.unregister_truncate(); + ms.unregister_replication_finished(); } // Merges reconcilable_result:s from different shards into one From 1271ad6894ee6ef788c73908fb408c1f6f9c8dcb Mon Sep 17 00:00:00 2001 From: Asias He Date: Wed, 21 Oct 2015 16:06:01 +0800 Subject: [PATCH 15/15] storage_service: Implement send_replication_notification --- service/storage_service.cc | 43 +++++++++++++++++++------------------- service/storage_service.hh | 2 +- 2 files changed, 23 insertions(+), 22 deletions(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index b4171e2adb..f00ddb8254 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1859,11 +1859,11 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr return sp.execute().then_wrapped([this, notify_endpoint] (auto&& f) { try { auto state = f.get0(); - this->send_replication_notification(notify_endpoint); + return this->send_replication_notification(notify_endpoint); } catch (...) { logger.warn("Streaming to restore replica count failed: {}", std::current_exception()); // We still want to send the notification - this->send_replication_notification(notify_endpoint); + return this->send_replication_notification(notify_endpoint); } return make_ready_future<>(); }); @@ -1891,27 +1891,28 @@ void storage_service::excise(std::unordered_set tokens, inet_address endp excise(tokens, endpoint); } -void storage_service::send_replication_notification(inet_address remote) { -#if 0 +future<> storage_service::send_replication_notification(inet_address remote) { // notify the remote token - MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED); - IFailureDetector failureDetector = FailureDetector.instance; - if (logger.isDebugEnabled()) - logger.debug("Notifying {} of replication completion\n", remote); - while (failureDetector.isAlive(remote)) - { - AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote); - try - { - iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - return; // done + auto done = make_shared(false); + auto local = get_broadcast_address(); + logger.debug("Notifying {} of replication completion", remote); + return do_until( + [done, remote] { + return *done || !gms::get_local_failure_detector().is_alive(remote); + }, + [done, remote, local] { + auto& ms = net::get_local_messaging_service(); + net::shard_id id{remote, 0}; + return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { + try { + f.get(); + *done = true; + } catch (...) { + logger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception()); + } + }); } - catch(TimeoutException e) - { - // try again - } - } -#endif + ); } void storage_service::confirm_replication(inet_address node) { diff --git a/service/storage_service.hh b/service/storage_service.hh index b17e92acb4..e55da3e8e3 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -818,7 +818,7 @@ private: * * @param remote node to send notification to */ - void send_replication_notification(inet_address remote); + future<> send_replication_notification(inet_address remote); /** * Called when an endpoint is removed from the ring. This function checks