From b77ec2bd6a2f9e8a447346c83c2b0ade3363edb4 Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 Nov 2015 13:16:15 +0200 Subject: [PATCH 1/3] Importing token_range and endpoint_details from origin The storage server uses the token_range in origin to return inforamtion about the ring. This import the structures. The functionality in origin is redundant in this case and was not imported. Signed-off-by: Amnon Heiman --- dht/token_range.hh | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 dht/token_range.hh diff --git a/dht/token_range.hh b/dht/token_range.hh new file mode 100644 index 0000000000..95b48c45d2 --- /dev/null +++ b/dht/token_range.hh @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2015 ScyllaDB. + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General License for more details. + * + * You should have received a copy of the GNU General License + * along with Scylla. If not, see . + */ + +#pragma once +#include + +namespace dht { +struct endpoint_details { + sstring _host; + sstring _datacenter; + sstring _rack; +}; + +struct token_range { + sstring _start_token; + sstring _end_token; + std::vector _endpoints; + std::vector _rpc_endpoints; + std::vector _endpoint_details; +}; +} From 3809565dbd822d6c3ee643e7868242321a12d98c Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 Nov 2015 13:32:39 +0200 Subject: [PATCH 2/3] storage_service: Add method implementation from origin This patch adds the following methods implementation: getRpcaddress getRangeToAddressMap getRangeToAddressMapInLocalDC describeRing getAllRanges Those methods are used as part of the describe_ring method implementation. Signed-off-by: Amnon Heiman --- service/storage_service.hh | 244 +++++++++++++++---------------------- 1 file changed, 96 insertions(+), 148 deletions(-) diff --git a/service/storage_service.hh b/service/storage_service.hh index 5e1dd8bc2c..6d3fe7cea7 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -47,6 +47,7 @@ #include "core/distributed.hh" #include "dht/i_partitioner.hh" #include "dht/boot_strapper.hh" +#include "dht/token_range.hh" #include "core/sleep.hh" #include "gms/application_state.hh" #include "db/system_keyspace.hh" @@ -92,6 +93,8 @@ public: }; private: using token = dht::token; + using token_range = dht::token_range; + using endpoint_details = dht::endpoint_details; using boot_strapper = dht::boot_strapper; using token_metadata = locator::token_metadata; using application_state = gms::application_state; @@ -146,7 +149,7 @@ public: }; private: bool is_auto_bootstrap(); - inet_address get_broadcast_address() { + inet_address get_broadcast_address() const { return utils::fb_utilities::get_broadcast_address(); } static int get_ring_delay() { @@ -448,22 +451,23 @@ public: } return map; } - +#endif /** * Return the rpc address associated with an endpoint as a string. * @param endpoint The endpoint to get rpc address for * @return the rpc address */ - public String getRpcaddress(InetAddress endpoint) - { - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - return DatabaseDescriptor.getBroadcastRpcAddress().getHostAddress(); - else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null) - return endpoint.getHostAddress(); - else - return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value; + sstring get_rpc_address(const inet_address& endpoint) const { + if (endpoint != get_broadcast_address()) { + auto v = gms::get_local_gossiper().get_endpoint_state_for_endpoint(endpoint)->get_application_state(gms::application_state::RPC_ADDRESS); + if (v) { + return boost::lexical_cast(v.value()); + } + } + return boost::lexical_cast(endpoint); } +#if 0 /** * for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace. * @param keyspace @@ -500,64 +504,55 @@ public: } return map; } - - public Map, List> getRangeToAddressMap(String keyspace) - { - return getRangeToAddressMap(keyspace, _token_metadata.sortedTokens()); +#endif + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace) const { + return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens()); } - public Map, List> getRangeToAddressMapInLocalDC(String keyspace) - { - Predicate isLocalDC = new Predicate() - { - public boolean apply(InetAddress address) - { - return isLocalDC(address); - } + std::unordered_map, std::vector> get_range_to_address_map_in_local_dc( + const sstring& keyspace) const { + std::function filter = [this](const inet_address& address) { + return is_local_dc(address); }; - Map, List> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC()); - Map, List> filteredMap = Maps.newHashMap(); - for (Map.Entry, List> entry : origMap.entrySet()) - { - List endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC)); - filteredMap.put(entry.getKey(), endpointsInLocalDC); + auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc()); + std::unordered_map, std::vector> filtered_map; + for (auto entry : orig_map) { + filtered_map[entry.first].reserve(entry.second.size()); + std::remove_copy_if(entry.second.begin(), entry.second.end(), + filtered_map[entry.first].begin(), filter); } - return filteredMap; + return filtered_map; } - private List getTokensInLocalDC() - { - List filteredTokens = Lists.newArrayList(); - for (Token token : _token_metadata.sortedTokens()) - { - InetAddress endpoint = _token_metadata.getEndpoint(token); - if (isLocalDC(endpoint)) - filteredTokens.add(token); + std::vector get_tokens_in_local_dc() const { + std::vector filtered_tokens; + for (auto token : _token_metadata.sorted_tokens()) { + auto endpoint = _token_metadata.get_endpoint(token); + if (is_local_dc(*endpoint)) + filtered_tokens.push_back(token); } - return filteredTokens; + return filtered_tokens; } - private boolean isLocalDC(InetAddress targetHost) - { - String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - return remoteDC.equals(localDC); + bool is_local_dc(const inet_address& targetHost) const { + auto remote_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(targetHost); + auto local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(get_broadcast_address()); + return remote_dc == local_dc; } - private Map, List> getRangeToAddressMap(String keyspace, List sortedTokens) - { + std::unordered_map, std::vector> get_range_to_address_map(const sstring& keyspace, + const std::vector& sorted_tokens) const { // some people just want to get a visual representation of things. Allow null and set it to the first // non-system keyspace. - if (keyspace == null) - keyspace = Schema.instance.getNonSystemKeyspaces().get(0); - - List> ranges = getAllRanges(sortedTokens); - return constructRangeToEndpointMap(keyspace, ranges); + if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) { + throw std::runtime_error("No keyspace provided and no non system kespace exist"); + } + const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace; + return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens)); } - /** * The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility * @@ -565,38 +560,15 @@ public: * * @return a List of TokenRange(s) converted to String for the given keyspace */ - public List describeRingJMX(String keyspace) throws IOException - { - List tokenRanges; - try - { - tokenRanges = describeRing(keyspace); - } - catch (InvalidRequestException e) - { - throw new IOException(e.getMessage()); - } - List result = new ArrayList<>(tokenRanges.size()); - for (TokenRange tokenRange : tokenRanges) - result.add(tokenRange.toString()); - - return result; - } - - /** - * The TokenRange for a given keyspace. - * - * @param keyspace The keyspace to fetch information about - * - * @return a List of TokenRange(s) for the given keyspace - * - * @throws InvalidRequestException if there is no ring information available about keyspace + /* + * describeRingJMX will be implemented in the API + * It is left here just as a marker that there is no need to implement it + * here */ - public List describeRing(String keyspace) throws InvalidRequestException - { - return describeRing(keyspace, false); - } + //std::vector describeRingJMX(const sstring& keyspace) const { + +#if 0 /** * The same as {@code describeRing(String)} but considers only the part of the ring formed by nodes in the local DC. @@ -605,54 +577,34 @@ public: { return describeRing(keyspace, true); } +#endif + std::vector describe_ring(const sstring& keyspace, bool include_only_local_dc = false) const { + std::vector ranges; + //Token.TokenFactory tf = getPartitioner().getTokenFactory(); - private List describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException - { - if (!Schema.instance.getKeyspaces().contains(keyspace)) - throw new InvalidRequestException("No such keyspace: " + keyspace); - - if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy) - throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace); - - List ranges = new ArrayList<>(); - Token.TokenFactory tf = getPartitioner().getTokenFactory(); - - Map, List> rangeToAddressMap = - includeOnlyLocalDC - ? getRangeToAddressMapInLocalDC(keyspace) - : getRangeToAddressMap(keyspace); - - for (Map.Entry, List> entry : rangeToAddressMap.entrySet()) - { - Range range = entry.getKey(); - List addresses = entry.getValue(); - List endpoints = new ArrayList<>(addresses.size()); - List rpc_endpoints = new ArrayList<>(addresses.size()); - List epDetails = new ArrayList<>(addresses.size()); - - for (InetAddress endpoint : addresses) - { - EndpointDetails details = new EndpointDetails(); - details.host = endpoint.getHostAddress(); - details.datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint); - details.rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint); - - endpoints.add(details.host); - rpc_endpoints.add(getRpcaddress(endpoint)); - - epDetails.add(details); + std::unordered_map, std::vector> range_to_address_map = + include_only_local_dc + ? get_range_to_address_map_in_local_dc(keyspace) + : get_range_to_address_map(keyspace); + for (auto entry : range_to_address_map) { + auto range = entry.first; + auto addresses = entry.second; + token_range tr; + for (auto endpoint : addresses) { + endpoint_details details; + details._host = boost::lexical_cast(endpoint); + details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint); + details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint); + tr._rpc_endpoints.push_back(get_rpc_address(endpoint)); + tr._endpoints.push_back(details._host); + tr._endpoint_details.push_back(details); } - - TokenRange tr = new TokenRange(tf.toString(range.left.getToken()), tf.toString(range.right.getToken()), endpoints) - .setEndpoint_details(epDetails) - .setRpc_endpoints(rpc_endpoints); - - ranges.add(tr); + ranges.push_back(tr); } - return ranges; } +#if 0 public Map getTokenToEndpointMap() { Map mapInetAddress = _token_metadata.getNormalAndBootstrappingTokenToEndpointMap(); @@ -679,23 +631,23 @@ public: mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString()); return mapOut; } - +#endif /** * Construct the range to endpoint mapping based on the true view * of the world. * @param ranges * @return mapping of ranges to the replicas responsible for them. */ - private Map, List> constructRangeToEndpointMap(String keyspace, List> ranges) - { - Map, List> rangeToEndpointMap = new HashMap<>(ranges.size()); - for (Range range : ranges) - { - rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right)); + std::unordered_map, std::vector> construct_range_to_endpoint_map( + const sstring& keyspace, + const std::vector>& ranges) const { + std::unordered_map, std::vector> res; + for (auto r : ranges) { + res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(r.end()->value()); } - return rangeToEndpointMap; + return res; } -#endif + public: virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override; virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override; @@ -1682,33 +1634,29 @@ public: return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep); } -#if 0 /** * Get all ranges that span the ring given a set * of tokens. All ranges are in sorted order of * ranges. * @return ranges in sorted order */ - public List> getAllRanges(List sortedTokens) - { - if (logger.isDebugEnabled()) - logger.debug("computing ranges for {}", StringUtils.join(sortedTokens, ", ")); + std::vector> get_all_ranges(const std::vector& sorted_tokens) const{ - if (sortedTokens.isEmpty()) - return Collections.emptyList(); - int size = sortedTokens.size(); - List> ranges = new ArrayList<>(size + 1); - for (int i = 1; i < size; ++i) - { - Range range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i)); - ranges.add(range); + if (sorted_tokens.empty()) + return std::vector>(); + int size = sorted_tokens.size(); + std::vector> ranges; + for (int i = 1; i < size; ++i) { + range r(range::bound(sorted_tokens[i - 1], false), range::bound(sorted_tokens[i], true)); + ranges.push_back(r); } - Range range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0)); - ranges.add(range); + range r(range::bound(sorted_tokens[size - 1], false), + range::bound(sorted_tokens[0], true)); + ranges.push_back(r); return ranges; } - +#if 0 /** * This method returns the N endpoints that are responsible for storing the * specified key i.e for replication. From ef3c6b26475080f17d81ae602462a0199cf72cbc Mon Sep 17 00:00:00 2001 From: Amnon Heiman Date: Mon, 2 Nov 2015 13:39:59 +0200 Subject: [PATCH 3/3] API: Add describe ring API implementation This patch chanages the API to support describe ring instead of describe ring jmx that will be implemented in the jmx server. The API will return a list of objects instead of string. An additional api was added as the equivelent to the jmx call with an empty param. Signed-off-by: Amnon Heiman --- api/api-doc/storage_service.json | 78 ++++++++++++++++++++++++++++++-- api/storage_service.cc | 36 ++++++++++++--- 2 files changed, 105 insertions(+), 9 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 647ddb76e9..2a77eddae1 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -290,6 +290,25 @@ } ] }, + { + "path":"/storage_service/describe_ring/", + "operations":[ + { + "method":"GET", + "summary":"The TokenRange for a any keyspace", + "type":"array", + "items":{ + "type":"token_range" + }, + "nickname":"describe_any_ring", + "produces":[ + "application/json" + ], + "parameters":[ + ] + } + ] + }, { "path":"/storage_service/describe_ring/{keyspace}", "operations":[ @@ -298,9 +317,9 @@ "summary":"The TokenRange for a given keyspace", "type":"array", "items":{ - "type":"string" + "type":"token_range" }, - "nickname":"describe_ring_jmx", + "nickname":"describe_ring", "produces":[ "application/json" ], @@ -311,7 +330,7 @@ "required":true, "allowMultiple":false, "type":"string", - "paramType":"query" + "paramType":"path" } ] } @@ -2003,6 +2022,59 @@ "description":"The column family" } } + }, + "endpoint_detail":{ + "id":"endpoint_detail", + "description":"Endpoint detail", + "properties":{ + "host":{ + "type":"string", + "description":"The endpoint host" + }, + "datacenter":{ + "type":"string", + "description":"The endpoint datacenter" + }, + "rack":{ + "type":"string", + "description":"The endpoint rack" + } + } + }, + "token_range":{ + "id":"token_range", + "description":"Endpoint range information", + "properties":{ + "start_token":{ + "type":"string", + "description":"The range start token" + }, + "end_token":{ + "type":"string", + "description":"The range start token" + }, + "endpoints":{ + "type":"array", + "items":{ + "type":"string" + }, + "description":"The endpoints" + }, + "rpc_endpoints":{ + "type":"array", + "items":{ + "type":"string" + }, + "description":"The rpc endpoints" + }, + "endpoint_details":{ + "type":"array", + "items":{ + "type":"endpoint_detail" + }, + "description":"The endpoint details" + } + } } } } diff --git a/api/storage_service.cc b/api/storage_service.cc index 64211d3813..abcb2e53ea 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -43,6 +43,29 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) { throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist"); } + +static std::vector describe_ring(const sstring& keyspace) { + std::vector res; + for (auto d : service::get_local_storage_service().describe_ring(keyspace)) { + ss::token_range r; + r.start_token = d._start_token; + r.end_token = d._end_token; + r.endpoints = d._endpoints; + r.rpc_endpoints = d._rpc_endpoints; + for (auto det : d._endpoint_details) { + ss::endpoint_detail ed; + ed.host = det._host; + ed.datacenter = det._datacenter; + if (det._rack != "") { + ed.rack = det._rack; + } + r.endpoint_details.push(ed); + } + res.push_back(r); + } + return res; +} + void set_storage_service(http_context& ctx, routes& r) { ss::local_hostid.set(r, [](std::unique_ptr req) { return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) { @@ -125,12 +148,13 @@ void set_storage_service(http_context& ctx, routes& r) { return make_ready_future(res); }); - ss::describe_ring_jmx.set(r, [&ctx](std::unique_ptr req) { - //TBD - unimplemented(); - auto keyspace = validate_keyspace(ctx, req->param); - std::vector res; - return make_ready_future(res); + ss::describe_any_ring.set(r, [&ctx](const_req req) { + return describe_ring(""); + }); + + ss::describe_ring.set(r, [&ctx](const_req req) { + auto keyspace = validate_keyspace(ctx, req.param); + return describe_ring(keyspace); }); ss::get_host_id_map.set(r, [](const_req req) {