diff --git a/message/messaging_service.cc b/message/messaging_service.cc index 8057ea3687..ee2f709c56 100644 --- a/message/messaging_service.cc +++ b/message/messaging_service.cc @@ -557,5 +557,16 @@ future<> messaging_service::send_truncate(shard_id id, std::chrono::milliseconds return send_message_timeout(this, net::messaging_verb::TRUNCATE, std::move(id), std::move(timeout), std::move(ks), std::move(cf)); } +// Wrapper for REPLICATION_FINISHED +void messaging_service::register_replication_finished(std::function (inet_address)>&& func) { + register_handler(this, messaging_verb::REPLICATION_FINISHED, std::move(func)); +} +void messaging_service::unregister_replication_finished() { + _rpc->unregister_handler(messaging_verb::REPLICATION_FINISHED); +} +future<> messaging_service::send_replication_finished(shard_id id, inet_address from) { + // FIXME: getRpcTimeout : conf.request_timeout_in_ms + return send_message_timeout(this, messaging_verb::REPLICATION_FINISHED, std::move(id), 10000ms, std::move(from)); +} } // namespace net diff --git a/message/messaging_service.hh b/message/messaging_service.hh index 5e9e1254fe..f5c4b88bb8 100644 --- a/message/messaging_service.hh +++ b/message/messaging_service.hh @@ -533,6 +533,11 @@ public: void unregister_truncate(); future<> send_truncate(shard_id, std::chrono::milliseconds, sstring, sstring); + // Wrapper for REPLICATION_FINISHED verb + void register_replication_finished(std::function (inet_address from)>&& func); + void unregister_replication_finished(); + future<> send_replication_finished(shard_id id, inet_address from); + public: // Return rpc::protocol::client for a shard which is a ip + cpuid pair. shared_ptr get_rpc_client(messaging_verb verb, shard_id id); diff --git a/service/storage_proxy.cc b/service/storage_proxy.cc index 9e76aff56a..7cb3b34abd 100644 --- a/service/storage_proxy.cc +++ b/service/storage_proxy.cc @@ -2517,6 +2517,11 @@ void storage_proxy::init_messaging_service() { return sp._db.local().truncate(truncated_at, ksname, cfname); }); }); + + ms.register_replication_finished([] (gms::inet_address from) { + get_local_storage_service().confirm_replication(from); + return make_ready_future<>(); + }); } void storage_proxy::uninit_messaging_service() { @@ -2529,6 +2534,7 @@ void storage_proxy::uninit_messaging_service() { ms.unregister_read_mutation_data(); ms.unregister_read_digest(); ms.unregister_truncate(); + ms.unregister_replication_finished(); } // Merges reconcilable_result:s from different shards into one diff --git a/service/storage_service.cc b/service/storage_service.cc index b078bd4795..f00ddb8254 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -424,7 +424,6 @@ void storage_service::handle_state_bootstrap(inet_address endpoint) { } _token_metadata.add_bootstrap_tokens(tokens, endpoint); - // FIXME get_local_pending_range_calculator_service().update().get(); auto& gossiper = gms::get_local_gossiper(); @@ -454,10 +453,10 @@ void storage_service::handle_state_normal(inet_address endpoint) { if (gossiper.uses_host_id(endpoint)) { auto host_id = gossiper.get_host_id(endpoint); auto existing = _token_metadata.get_endpoint_for_host_id(host_id); - // if (DatabaseDescriptor.isReplacing() && - // Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && - // (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) { - if (false) { + if (is_replacing() && + get_replace_address() && + gossiper.get_endpoint_state_for_endpoint(get_replace_address().value()) && + (host_id == gossiper.get_host_id(get_replace_address().value()))) { logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); } else { if (existing && *existing != endpoint) { @@ -584,9 +583,7 @@ void storage_service::handle_state_leaving(inet_address endpoint) { // at this point the endpoint is certainly a member with this token, so let's proceed // normally -#if 0 - _token_metadata.addLeavingEndpoint(endpoint); -#endif + _token_metadata.add_leaving_endpoint(endpoint); get_local_pending_range_calculator_service().update().get(); } @@ -630,18 +627,24 @@ void storage_service::handle_state_removing(inet_address endpoint, std::vectorget_application_state(application_state::REMOVAL_COORDINATOR); + assert(value); + std::vector coordinator; + boost::split(coordinator, value->value, boost::is_any_of(sstring(versioned_value::DELIMITER_STR))); + assert(coordinator.size() == 2); + UUID host_id(coordinator[1]); // grab any data we are now responsible for and notify responsible node - restoreReplicaCount(endpoint, _token_metadata.getEndpointForHostId(hostId)); -#endif + auto ep = _token_metadata.get_endpoint_for_host_id(host_id); + assert(ep); + restore_replica_count(endpoint, ep.value()).get(); } } else { // now that the gossiper has told us about this nonexistent member, notify the gossiper to remove it if (sstring(gms::versioned_value::REMOVED_TOKEN) == pieces[0]) { @@ -1458,15 +1461,15 @@ future<> storage_service::decommission() { set_mode(mode::LEAVING, sprint("sleeping %s ms for batch processing and pending range setup", timeout), true); sleep(std::chrono::milliseconds(timeout)).get(); - unbootstrap().finally([this] { - // FIXME: proper shutdown - // shutdownClientServers(); - gms::get_local_gossiper().stop(); - // MessagingService.instance().shutdown(); - // StageManager.shutdownNow(); - set_mode(mode::DECOMMISSIONED, true); - // let op be responsible for killing the process - }).get(); + unbootstrap(); + + // FIXME: proper shutdown + // shutdownClientServers(); + gms::get_local_gossiper().stop(); + // MessagingService.instance().shutdown(); + // StageManager.shutdownNow(); + set_mode(mode::DECOMMISSIONED, true); + // let op be responsible for killing the process }); } @@ -1533,7 +1536,7 @@ future<> storage_service::remove_node(sstring host_id_string) { gossiper.advertise_removing(endpoint, host_id, local_host_id); // kick off streaming commands - // restoreReplicaCount(endpoint, myAddress); + restore_replica_count(endpoint, my_address).get(); // wait for ReplicationFinishedVerbHandler to signal we're done while (!_replicating_nodes.empty()) { @@ -1766,27 +1769,29 @@ std::unordered_multimap, inet_address> storage_service::get_changed return changed_ranges; } -future<> storage_service::unbootstrap() { - return make_ready_future<>(); -#if 0 - Map, InetAddress>> rangesToStream = new HashMap<>(); +// Runs inside seastar::async context +void storage_service::unbootstrap() { + std::unordered_map, inet_address>> ranges_to_stream; - for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) - { - Multimap, InetAddress> rangesMM = getChangedRangesForLeaving(keyspaceName, FBUtilities.getBroadcastAddress()); - - if (logger.isDebugEnabled()) - logger.debug("Ranges needing transfer are [{}]", StringUtils.join(rangesMM.keySet(), ",")); - - rangesToStream.put(keyspaceName, rangesMM); + auto non_system_keyspaces = _db.local().get_non_system_keyspaces(); + for (const auto& keyspace_name : non_system_keyspaces) { + auto ranges_mm = get_changed_ranges_for_leaving(keyspace_name, get_broadcast_address()); + if (logger.is_enabled(logging::log_level::debug)) { + std::vector> ranges; + for (auto& x : ranges_mm) { + ranges.push_back(x.first); + } + logger.debug("Ranges needing transfer are [{}]", ranges); + } + ranges_to_stream.emplace(keyspace_name, std::move(ranges_mm)); } - setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true); + set_mode(mode::LEAVING, "replaying batch log and streaming data to other nodes", true); // Start with BatchLog replay, which may create hints but no writes since this is no longer a valid endpoint. - Future batchlogReplay = BatchlogManager.instance.startBatchlogReplay(); - Future streamSuccess = streamRanges(rangesToStream); - + // FIXME: Future batchlogReplay = BatchlogManager.instance.startBatchlogReplay(); + auto stream_success = stream_ranges(ranges_to_stream); +#if 0 // Wait for batch log to complete before streaming hints. logger.debug("waiting for batch log processing."); try @@ -1797,26 +1802,23 @@ future<> storage_service::unbootstrap() { { throw new RuntimeException(e); } +#endif - setMode(Mode.LEAVING, "streaming hints to other nodes", true); + set_mode(mode::LEAVING, "streaming hints to other nodes", true); - Future hintsSuccess = streamHints(); + auto hints_success = stream_hints(); // wait for the transfer runnables to signal the latch. logger.debug("waiting for stream acks."); - try - { - streamSuccess.get(); - hintsSuccess.get(); - } - catch (ExecutionException | InterruptedException e) - { - throw new RuntimeException(e); + try { + auto stream_state = stream_success.get0(); + auto hints_state = hints_success.get0(); + } catch (...) { + logger.warn("unbootstrap fails to stream : {}", std::current_exception()); + throw; } logger.debug("stream acks all received."); - leaveRing(); - onFinish.run(); -#endif + leave_ring(); } future<> storage_service::restore_replica_count(inet_address endpoint, inet_address notify_endpoint) { @@ -1857,11 +1859,11 @@ future<> storage_service::restore_replica_count(inet_address endpoint, inet_addr return sp.execute().then_wrapped([this, notify_endpoint] (auto&& f) { try { auto state = f.get0(); - this->send_replication_notification(notify_endpoint); + return this->send_replication_notification(notify_endpoint); } catch (...) { logger.warn("Streaming to restore replica count failed: {}", std::current_exception()); // We still want to send the notification - this->send_replication_notification(notify_endpoint); + return this->send_replication_notification(notify_endpoint); } return make_ready_future<>(); }); @@ -1875,12 +1877,12 @@ void storage_service::excise(std::unordered_set tokens, inet_address endp _token_metadata.remove_endpoint(endpoint); _token_metadata.remove_bootstrap_tokens(tokens); - // FIXME: IEndpointLifecycleSubscriber -#if 0 - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) { - subscriber.onLeaveCluster(endpoint); - } -#endif + get_storage_service().invoke_on_all([endpoint] (auto&& ss) { + for (auto&& subscriber : ss._lifecycle_subscribers) { + subscriber->on_leave_cluster(endpoint); + } + }).get(); + get_local_pending_range_calculator_service().update().get(); } @@ -1889,4 +1891,132 @@ void storage_service::excise(std::unordered_set tokens, inet_address endp excise(tokens, endpoint); } +future<> storage_service::send_replication_notification(inet_address remote) { + // notify the remote token + auto done = make_shared(false); + auto local = get_broadcast_address(); + logger.debug("Notifying {} of replication completion", remote); + return do_until( + [done, remote] { + return *done || !gms::get_local_failure_detector().is_alive(remote); + }, + [done, remote, local] { + auto& ms = net::get_local_messaging_service(); + net::shard_id id{remote, 0}; + return ms.send_replication_finished(id, local).then_wrapped([id, done] (auto&& f) { + try { + f.get(); + *done = true; + } catch (...) { + logger.warn("Fail to send REPLICATION_FINISHED to {}: {}", id, std::current_exception()); + } + }); + } + ); +} + +void storage_service::confirm_replication(inet_address node) { + // replicatingNodes can be empty in the case where this node used to be a removal coordinator, + // but restarted before all 'replication finished' messages arrived. In that case, we'll + // still go ahead and acknowledge it. + if (!_replicating_nodes.empty()) { + _replicating_nodes.erase(node); + } else { + logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node); + } +} + +// Runs inside seastar::async context +void storage_service::leave_ring() { + db::system_keyspace::set_bootstrap_state(db::system_keyspace::bootstrap_state::NEEDS_BOOTSTRAP).get(); + _token_metadata.remove_endpoint(get_broadcast_address()); + get_local_pending_range_calculator_service().update().get(); + + auto& gossiper = gms::get_local_gossiper(); + auto expire_time = gossiper.compute_expire_time().time_since_epoch().count(); + gossiper.add_local_application_state(gms::application_state::STATUS, value_factory.left(get_local_tokens(), expire_time)); + auto delay = std::max(std::chrono::milliseconds(RING_DELAY), gms::gossiper::INTERVAL); + logger.info("Announcing that I have left the ring for {}ms", delay.count()); + sleep(delay).get(); +} + +future +storage_service::stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace) { + using stream_plan = streaming::stream_plan; + // First, we build a list of ranges to stream to each host, per table + std::unordered_map>>> sessions_to_stream_by_keyspace; + for (auto& entry : ranges_to_stream_by_keyspace) { + const auto& keyspace = entry.first; + auto& ranges_with_endpoints = entry.second; + + if (ranges_with_endpoints.empty()) { + continue; + } + + std::unordered_map>> ranges_per_endpoint; + for (auto& end_point_entry : ranges_with_endpoints) { + range r = end_point_entry.first; + inet_address endpoint = end_point_entry.second; + ranges_per_endpoint[endpoint].emplace_back(r); + } + sessions_to_stream_by_keyspace.emplace(keyspace, std::move(ranges_per_endpoint)); + } + stream_plan sp("Unbootstrap", true); + for (auto& entry : sessions_to_stream_by_keyspace) { + const auto& keyspace_name = entry.first; + // TODO: we can move to avoid copy of std::vector + auto& ranges_per_endpoint = entry.second; + + for (auto& ranges_entry : ranges_per_endpoint) { + auto& ranges = ranges_entry.second; + auto new_endpoint = ranges_entry.first; + auto preferred = new_endpoint; // FIXME: SystemKeyspace.getPreferredIP(newEndpoint); + + // TODO each call to transferRanges re-flushes, this is potentially a lot of waste + sp.transfer_ranges(new_endpoint, preferred, keyspace_name, ranges); + } + } + return sp.execute(); +} + +future storage_service::stream_hints() { + // FIXME: flush hits column family +#if 0 + // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) + ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); + FBUtilities.waitOnFuture(hintsCF.forceFlush()); +#endif + + // gather all live nodes in the cluster that aren't also leaving + auto candidates = get_local_storage_service().get_token_metadata().clone_after_all_left().get_all_endpoints(); + auto beg = candidates.begin(); + auto end = candidates.end(); + auto remove_fn = [br = get_broadcast_address()] (const inet_address& ep) { + return ep == br || !gms::get_local_failure_detector().is_alive(ep); + }; + candidates.erase(std::remove_if(beg, end, remove_fn), end); + + if (candidates.empty()) { + logger.warn("Unable to stream hints since no live endpoints seen"); + throw std::runtime_error("Unable to stream hints since no live endpoints seen"); + } else { + // stream to the closest peer as chosen by the snitch + auto& snitch = locator::i_endpoint_snitch::get_local_snitch_ptr(); + + snitch->sort_by_proximity(get_broadcast_address(), candidates); + auto hints_destination_host = candidates.front(); + auto preferred = hints_destination_host; // FIXME: SystemKeyspace.getPreferredIP(hints_destination_host); + + // stream all hints -- range list will be a singleton of "the entire ring" + auto t = dht::global_partitioner().get_minimum_token(); + std::vector> ranges = {range(t)}; + + streaming::stream_plan sp("Hints", true); + std::vector column_families = { db::system_keyspace::HINTS }; + auto keyspace = db::system_keyspace::NAME; + sp.transfer_ranges(hints_destination_host, preferred, keyspace, ranges, column_families); + return sp.execute(); + } +} + } // namespace service diff --git a/service/storage_service.hh b/service/storage_service.hh index 3e07713c82..e55da3e8e3 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -54,6 +54,7 @@ #include "utils/fb_utilities.hh" #include "database.hh" #include +#include "streaming/stream_state.hh" namespace service { @@ -807,6 +808,9 @@ private: #endif } +public: + void confirm_replication(inet_address node); + private: /** @@ -814,30 +818,8 @@ private: * * @param remote node to send notification to */ - void send_replication_notification(inet_address remote) { -#if 0 - // notify the remote token - MessageOut msg = new MessageOut(MessagingService.Verb.REPLICATION_FINISHED); - IFailureDetector failureDetector = FailureDetector.instance; - if (logger.isDebugEnabled()) - logger.debug("Notifying {} of replication completion\n", remote); - while (failureDetector.isAlive(remote)) - { - AsyncOneResponse iar = MessagingService.instance().sendRR(msg, remote); - try - { - iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS); - return; // done - } - catch(TimeoutException e) - { - // try again - } - } -#endif - } + future<> send_replication_notification(inet_address remote); -private: /** * Called when an endpoint is removed from the ring. This function checks * whether this node becomes responsible for new ranges as a @@ -1910,64 +1892,12 @@ public: future<> decommission(); -#if 0 - private void leaveRing() - { - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP); - _token_metadata.removeEndpoint(FBUtilities.getBroadcastAddress()); - PendingRangeCalculatorService.instance.update(); - - Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime())); - int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2); - logger.info("Announcing that I have left the ring for {}ms", delay); - Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS); - } -#endif private: - future<> unbootstrap(); + void leave_ring(); + void unbootstrap(); + future stream_hints(); #if 0 - private Future streamHints() - { - // StreamPlan will not fail if there are zero files to transfer, so flush anyway (need to get any in-memory hints, as well) - ColumnFamilyStore hintsCF = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS); - FBUtilities.waitOnFuture(hintsCF.forceFlush()); - - // gather all live nodes in the cluster that aren't also leaving - List candidates = new ArrayList<>(StorageService.instance.getTokenMetadata().cloneAfterAllLeft().getAllEndpoints()); - candidates.remove(FBUtilities.getBroadcastAddress()); - for (Iterator iter = candidates.iterator(); iter.hasNext(); ) - { - InetAddress address = iter.next(); - if (!FailureDetector.instance.isAlive(address)) - iter.remove(); - } - - if (candidates.isEmpty()) - { - logger.warn("Unable to stream hints since no live endpoints seen"); - return Futures.immediateFuture(null); - } - else - { - // stream to the closest peer as chosen by the snitch - DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), candidates); - InetAddress hintsDestinationHost = candidates.get(0); - InetAddress preferred = SystemKeyspace.getPreferredIP(hintsDestinationHost); - - // stream all hints -- range list will be a singleton of "the entire ring" - Token token = StorageService.getPartitioner().getMinimumToken(); - List> ranges = Collections.singletonList(new Range<>(token, token)); - - return new StreamPlan("Hints").transferRanges(hintsDestinationHost, - preferred, - SystemKeyspace.NAME, - ranges, - SystemKeyspace.HINTS) - .execute(); - } - } - public void move(String newToken) throws IOException { try @@ -2240,22 +2170,6 @@ public: */ future<> remove_node(sstring host_id_string); -#if 0 - public void confirmReplication(InetAddress node) - { - // replicatingNodes can be empty in the case where this node used to be a removal coordinator, - // but restarted before all 'replication finished' messages arrived. In that case, we'll - // still go ahead and acknowledge it. - if (!replicatingNodes.isEmpty()) - { - replicatingNodes.remove(node); - } - else - { - logger.info("Received unexpected REPLICATION_FINISHED message from {}. Was this node recently a removal coordinator?", node); - } - } -#endif future get_operation_mode(); future is_starting(); @@ -2437,62 +2351,18 @@ public: if (oldSnitch instanceof DynamicEndpointSnitch) ((DynamicEndpointSnitch)oldSnitch).unregisterMBean(); } +#endif +private: /** * Seed data to the endpoints that will be responsible for it at the future * * @param rangesToStreamByKeyspace keyspaces and data ranges with endpoints included for each * @return async Future for whether stream was success */ - private Future streamRanges(Map, InetAddress>> rangesToStreamByKeyspace) - { - // First, we build a list of ranges to stream to each host, per table - Map>>> sessionsToStreamByKeyspace = new HashMap<>(); - for (Map.Entry, InetAddress>> entry : rangesToStreamByKeyspace.entrySet()) - { - String keyspace = entry.getKey(); - Multimap, InetAddress> rangesWithEndpoints = entry.getValue(); - - if (rangesWithEndpoints.isEmpty()) - continue; - - Map>> rangesPerEndpoint = new HashMap<>(); - for (Map.Entry, InetAddress> endPointEntry : rangesWithEndpoints.entries()) - { - Range range = endPointEntry.getKey(); - InetAddress endpoint = endPointEntry.getValue(); - - List> curRanges = rangesPerEndpoint.get(endpoint); - if (curRanges == null) - { - curRanges = new LinkedList<>(); - rangesPerEndpoint.put(endpoint, curRanges); - } - curRanges.add(range); - } - - sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint); - } - - StreamPlan streamPlan = new StreamPlan("Unbootstrap"); - for (Map.Entry>>> entry : sessionsToStreamByKeyspace.entrySet()) - { - String keyspaceName = entry.getKey(); - Map>> rangesPerEndpoint = entry.getValue(); - - for (Map.Entry>> rangesEntry : rangesPerEndpoint.entrySet()) - { - List> ranges = rangesEntry.getValue(); - InetAddress newEndpoint = rangesEntry.getKey(); - InetAddress preferred = SystemKeyspace.getPreferredIP(newEndpoint); - - // TODO each call to transferRanges re-flushes, this is potentially a lot of waste - streamPlan.transferRanges(newEndpoint, preferred, keyspaceName, ranges); - } - } - return streamPlan.execute(); - } + future stream_ranges(std::unordered_map, inet_address>> ranges_to_stream_by_keyspace); +#if 0 /** * Calculate pair of ranges to stream/fetch for given two range collections * (current ranges for keyspace and ranges after move to new token) @@ -2600,6 +2470,7 @@ public: return loader.stream(); } #endif +public: int32_t get_exception_count(); #if 0 public void rescheduleFailedDeletions()