diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 647ddb76e9..2a77eddae1 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -290,6 +290,25 @@ } ] }, + { + "path":"/storage_service/describe_ring/", + "operations":[ + { + "method":"GET", + "summary":"The TokenRange for a any keyspace", + "type":"array", + "items":{ + "type":"token_range" + }, + "nickname":"describe_any_ring", + "produces":[ + "application/json" + ], + "parameters":[ + ] + } + ] + }, { "path":"/storage_service/describe_ring/{keyspace}", "operations":[ @@ -298,9 +317,9 @@ "summary":"The TokenRange for a given keyspace", "type":"array", "items":{ - "type":"string" + "type":"token_range" }, - "nickname":"describe_ring_jmx", + "nickname":"describe_ring", "produces":[ "application/json" ], @@ -311,7 +330,7 @@ "required":true, "allowMultiple":false, "type":"string", - "paramType":"query" + "paramType":"path" } ] } @@ -2003,6 +2022,59 @@ "description":"The column family" } } + }, + "endpoint_detail":{ + "id":"endpoint_detail", + "description":"Endpoint detail", + "properties":{ + "host":{ + "type":"string", + "description":"The endpoint host" + }, + "datacenter":{ + "type":"string", + "description":"The endpoint datacenter" + }, + "rack":{ + "type":"string", + "description":"The endpoint rack" + } + } + }, + "token_range":{ + "id":"token_range", + "description":"Endpoint range information", + "properties":{ + "start_token":{ + "type":"string", + "description":"The range start token" + }, + "end_token":{ + "type":"string", + "description":"The range start token" + }, + "endpoints":{ + "type":"array", + "items":{ + "type":"string" + }, + "description":"The endpoints" + }, + "rpc_endpoints":{ + "type":"array", + "items":{ + "type":"string" + }, + "description":"The rpc endpoints" + }, + "endpoint_details":{ + "type":"array", + "items":{ + "type":"endpoint_detail" + }, + "description":"The endpoint details" + } + } } } } diff --git a/api/storage_service.cc b/api/storage_service.cc index 64211d3813..abcb2e53ea 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -43,6 +43,29 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) { throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist"); } + +static std::vector describe_ring(const sstring& keyspace) { + std::vector res; + for (auto d : service::get_local_storage_service().describe_ring(keyspace)) { + ss::token_range r; + r.start_token = d._start_token; + r.end_token = d._end_token; + r.endpoints = d._endpoints; + r.rpc_endpoints = d._rpc_endpoints; + for (auto det : d._endpoint_details) { + ss::endpoint_detail ed; + ed.host = det._host; + ed.datacenter = det._datacenter; + if (det._rack != "") { + ed.rack = det._rack; + } + r.endpoint_details.push(ed); + } + res.push_back(r); + } + return res; +} + void set_storage_service(http_context& ctx, routes& r) { ss::local_hostid.set(r, [](std::unique_ptr req) { return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) { @@ -125,12 +148,13 @@ void set_storage_service(http_context& ctx, routes& r) { return make_ready_future(res); }); - ss::describe_ring_jmx.set(r, [&ctx](std::unique_ptr req) { - //TBD - unimplemented(); - auto keyspace = validate_keyspace(ctx, req->param); - std::vector res; - return make_ready_future(res); + ss::describe_any_ring.set(r, [&ctx](const_req req) { + return describe_ring(""); + }); + + ss::describe_ring.set(r, [&ctx](const_req req) { + auto keyspace = validate_keyspace(ctx, req.param); + return describe_ring(keyspace); }); ss::get_host_id_map.set(r, [](const_req req) { diff --git a/dht/token_range.hh b/dht/token_range.hh new file mode 100644 index 0000000000..95b48c45d2 --- /dev/null +++ b/dht/token_range.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2015 ScyllaDB. + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General License for more details. + * + * You should have received a copy of the GNU General License + * along with Scylla. If not, see . + */ + +#pragma once +#include + +namespace dht { +struct endpoint_details { + sstring _host; + sstring _datacenter; + sstring _rack; +}; + +struct token_range { + sstring _start_token; + sstring _end_token; + std::vector _endpoints; + std::vector _rpc_endpoints; + std::vector _endpoint_details; +}; +} diff --git a/service/storage_service.hh b/service/storage_service.hh index 5e1dd8bc2c..6d3fe7cea7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -47,6 +47,7 @@ #include "core/distributed.hh" #include "dht/i_partitioner.hh" #include "dht/boot_strapper.hh" +#include "dht/token_range.hh" #include "core/sleep.hh" #include "gms/application_state.hh" #include "db/system_keyspace.hh" @@ -92,6 +93,8 @@ public: }; private: using token = dht::token; + using token_range = dht::token_range; + using endpoint_details = dht::endpoint_details; using boot_strapper = dht::boot_strapper; using token_metadata = locator::token_metadata; using application_state = gms::application_state; @@ -146,7 +149,7 @@ public: }; private: bool is_auto_bootstrap(); - inet_address get_broadcast_address() { + inet_address get_broadcast_address() const { return utils::fb_utilities::get_broadcast_address(); } static int get_ring_delay() { @@ -448,22 +451,23 @@ public: } return map; } - +#endif /** * Return the rpc address associated with an endpoint as a string. * @param endpoint The endpoint to get rpc address for * @return the rpc address */ - public String getRpcaddress(InetAddress endpoint) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - return DatabaseDescriptor.getBroadcastRpcAddress().getHostAddress(); - else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null) - return endpoint.getHostAddress(); - else - return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value; + sstring get_rpc_address(const inet_address& endpoint) const { + if (endpoint != get_broadcast_address()) { + auto v = gms::get_local_gossiper().get_endpoint_state_for_endpoint(endpoint)->get_application_state(gms::application_state::RPC_ADDRESS); + if (v) { + return boost::lexical_cast(v.value()); + } + } + return boost::lexical_cast(endpoint); } +#if 0 /** * for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace. * @param keyspace @@ -500,64 +504,55 @@ public: } return map; } - - public Map, List> getRangeToAddressMap(String keyspace) - { - return getRangeToAddressMap(keyspace, _token_metadata.sortedTokens()); +#endif + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace) const { + return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens()); } - public Map, List> getRangeToAddressMapInLocalDC(String keyspace) - { - Predicate isLocalDC = new Predicate() - { - public boolean apply(InetAddress address) - { - return isLocalDC(address); - } + std::unordered_map, std::vector> get_range_to_address_map_in_local_dc( + const sstring& keyspace) const { + std::function filter = [this](const inet_address& address) { + return is_local_dc(address); }; - Map, List> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); - Map, List> filteredMap = Maps.newHashMap(); - for (Map.Entry, List> entry : origMap.entrySet()) - { - List endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); - filteredMap.put(entry.getKey(), endpointsInLocalDC); + auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); + std::unordered_map, std::vector> filtered_map; + for (auto entry : orig_map) { + filtered_map[entry.first].reserve(entry.second.size()); + std::remove_copy_if(entry.second.begin(), entry.second.end(), + filtered_map[entry.first].begin(), filter); } - return filteredMap; + return filtered_map; } - private List getTokensInLocalDC() - { - List filteredTokens = Lists.newArrayList(); - for (Token token : _token_metadata.sortedTokens()) - { - InetAddress endpoint = _token_metadata.getEndpoint(token); - if (isLocalDC(endpoint)) - filteredTokens.add(token); + std::vector get_tokens_in_local_dc() const { + std::vector filtered_tokens; + for (auto token : _token_metadata.sorted_tokens()) { + auto endpoint = _token_metadata.get_endpoint(token); + if (is_local_dc(*endpoint)) + filtered_tokens.push_back(token); } - return filteredTokens; + return filtered_tokens; } - private boolean isLocalDC(InetAddress targetHost) - { - String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - return remoteDC.equals(localDC); + bool is_local_dc(const inet_address& targetHost) const { + auto remote_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(targetHost); + auto local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(get_broadcast_address()); + return remote_dc == local_dc; } - private Map, List> getRangeToAddressMap(String keyspace, List sortedTokens) - { + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace, + const std::vector& sorted_tokens) const { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. - if (keyspace == null) - keyspace = Schema.instance.getNonSystemKeyspaces().get(0); - - List> ranges = getAllRanges(sortedTokens); - return constructRangeToEndpointMap(keyspace, ranges); + if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) { + throw std::runtime_error("No keyspace provided and no non system kespace exist"); + } + const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace; + return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens)); } - /** * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility * @@ -565,38 +560,15 @@ public: * * @return a List of TokenRange(s) converted to String for the given keyspace */ - public List describeRingJMX(String keyspace) throws IOException - { - List tokenRanges; - try - { - tokenRanges = describeRing(keyspace); - } - catch (InvalidRequestException e) - { - throw new IOException(e.getMessage()); - } - List result = new ArrayList<>(tokenRanges.size()); - for (TokenRange tokenRange : tokenRanges) - result.add(tokenRange.toString()); - - return result; - } - - /** - * The TokenRange for a given keyspace. - * - * @param keyspace The keyspace to fetch information about - * - * @return a List of TokenRange(s) for the given keyspace - * - * @throws InvalidRequestException if there is no ring information available about keyspace + /* + * describeRingJMX will be implemented in the API + * It is left here just as a marker that there is no need to implement it + * here */ - public List describeRing(String keyspace) throws InvalidRequestException - { - return describeRing(keyspace, false); - } + //std::vector describeRingJMX(const sstring& keyspace) const { + +#if 0 /** * The same as {@code describeRing(String)} but considers only the part of the ring formed by nodes in the local DC. @@ -605,54 +577,34 @@ public: { return describeRing(keyspace, true); } +#endif + std::vector describe_ring(const sstring& keyspace, bool include_only_local_dc = false) const { + std::vector ranges; + //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - private List describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException - { - if (!Schema.instance.getKeyspaces().contains(keyspace)) - throw new InvalidRequestException("No such keyspace: " + keyspace); - - if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy) - throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace); - - List ranges = new ArrayList<>(); - Token.TokenFactory tf = getPartitioner().getTokenFactory(); - - Map, List> rangeToAddressMap = - includeOnlyLocalDC - ? getRangeToAddressMapInLocalDC(keyspace) - : getRangeToAddressMap(keyspace); - - for (Map.Entry, List> entry : rangeToAddressMap.entrySet()) - { - Range range = entry.getKey(); - List addresses = entry.getValue(); - List endpoints = new ArrayList<>(addresses.size()); - List rpc_endpoints = new ArrayList<>(addresses.size()); - List epDetails = new ArrayList<>(addresses.size()); - - for (InetAddress endpoint : addresses) - { - EndpointDetails details = new EndpointDetails(); - details.host = endpoint.getHostAddress(); - details.datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); - details.rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint); - - endpoints.add(details.host); - rpc_endpoints.add(getRpcaddress(endpoint)); - - epDetails.add(details); + std::unordered_map, std::vector> range_to_address_map = + include_only_local_dc + ? get_range_to_address_map_in_local_dc(keyspace) + : get_range_to_address_map(keyspace); + for (auto entry : range_to_address_map) { + auto range = entry.first; + auto addresses = entry.second; + token_range tr; + for (auto endpoint : addresses) { + endpoint_details details; + details._host = boost::lexical_cast(endpoint); + details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint); + details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint); + tr._rpc_endpoints.push_back(get_rpc_address(endpoint)); + tr._endpoints.push_back(details._host); + tr._endpoint_details.push_back(details); } - - TokenRange tr = new TokenRange(tf.toString(range.left.getToken()), tf.toString(range.right.getToken()), endpoints) - .setEndpoint_details(epDetails) - .setRpc_endpoints(rpc_endpoints); - - ranges.add(tr); + ranges.push_back(tr); } - return ranges; } +#if 0 public Map getTokenToEndpointMap() { Map mapInetAddress = _token_metadata.getNormalAndBootstrappingTokenToEndpointMap(); @@ -679,23 +631,23 @@ public: mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString()); return mapOut; } - +#endif /** * Construct the range to endpoint mapping based on the true view * of the world. * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - private Map, List> constructRangeToEndpointMap(String keyspace, List> ranges) - { - Map, List> rangeToEndpointMap = new HashMap<>(ranges.size()); - for (Range range : ranges) - { - rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right)); + std::unordered_map, std::vector> construct_range_to_endpoint_map( + const sstring& keyspace, + const std::vector>& ranges) const { + std::unordered_map, std::vector> res; + for (auto r : ranges) { + res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(r.end()->value()); } - return rangeToEndpointMap; + return res; } -#endif + public: virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override; @@ -1682,33 +1634,29 @@ public: return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep); } -#if 0 /** * Get all ranges that span the ring given a set * of tokens. All ranges are in sorted order of * ranges. * @return ranges in sorted order */ - public List> getAllRanges(List sortedTokens) - { - if (logger.isDebugEnabled()) - logger.debug("computing ranges for {}", StringUtils.join(sortedTokens, ", ")); + std::vector> get_all_ranges(const std::vector& sorted_tokens) const{ - if (sortedTokens.isEmpty()) - return Collections.emptyList(); - int size = sortedTokens.size(); - List> ranges = new ArrayList<>(size + 1); - for (int i = 1; i < size; ++i) - { - Range range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i)); - ranges.add(range); + if (sorted_tokens.empty()) + return std::vector>(); + int size = sorted_tokens.size(); + std::vector> ranges; + for (int i = 1; i < size; ++i) { + range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); + ranges.push_back(r); } - Range range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0)); - ranges.add(range); + range r(range::bound(sorted_tokens[size - 1], false), + range::bound(sorted_tokens[0], true)); + ranges.push_back(r); return ranges; } - +#if 0 /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication.