Merge "Adding describering API" from Amnon

"This series adds the missing functionality that the nodetool describering would work.

It import the missing functionality from origin.
After this patch the API:
GET /storage_service/describe_ring/{keyspace}

will be available"
This commit is contained in:
Avi Kivity
2015-11-03 16:54:13 +02:00
4 changed files with 240 additions and 157 deletions

View File

@@ -290,6 +290,25 @@
}
]
},
{
"path":"/storage_service/describe_ring/",
"operations":[
{
"method":"GET",
"summary":"The TokenRange for a any keyspace",
"type":"array",
"items":{
"type":"token_range"
},
"nickname":"describe_any_ring",
"produces":[
"application/json"
],
"parameters":[
]
}
]
},
{
"path":"/storage_service/describe_ring/{keyspace}",
"operations":[
@@ -298,9 +317,9 @@
"summary":"The TokenRange for a given keyspace",
"type":"array",
"items":{
"type":"string"
"type":"token_range"
},
"nickname":"describe_ring_jmx",
"nickname":"describe_ring",
"produces":[
"application/json"
],
@@ -311,7 +330,7 @@
"required":true,
"allowMultiple":false,
"type":"string",
"paramType":"query"
"paramType":"path"
}
]
}
@@ -2003,6 +2022,59 @@
"description":"The column family"
}
}
},
"endpoint_detail":{
"id":"endpoint_detail",
"description":"Endpoint detail",
"properties":{
"host":{
"type":"string",
"description":"The endpoint host"
},
"datacenter":{
"type":"string",
"description":"The endpoint datacenter"
},
"rack":{
"type":"string",
"description":"The endpoint rack"
}
}
},
"token_range":{
"id":"token_range",
"description":"Endpoint range information",
"properties":{
"start_token":{
"type":"string",
"description":"The range start token"
},
"end_token":{
"type":"string",
"description":"The range start token"
},
"endpoints":{
"type":"array",
"items":{
"type":"string"
},
"description":"The endpoints"
},
"rpc_endpoints":{
"type":"array",
"items":{
"type":"string"
},
"description":"The rpc endpoints"
},
"endpoint_details":{
"type":"array",
"items":{
"type":"endpoint_detail"
},
"description":"The endpoint details"
}
}
}
}
}

View File

@@ -43,6 +43,29 @@ static sstring validate_keyspace(http_context& ctx, const parameters& param) {
throw bad_param_exception("Keyspace " + param["keyspace"] + " Does not exist");
}
static std::vector<ss::token_range> describe_ring(const sstring& keyspace) {
std::vector<ss::token_range> res;
for (auto d : service::get_local_storage_service().describe_ring(keyspace)) {
ss::token_range r;
r.start_token = d._start_token;
r.end_token = d._end_token;
r.endpoints = d._endpoints;
r.rpc_endpoints = d._rpc_endpoints;
for (auto det : d._endpoint_details) {
ss::endpoint_detail ed;
ed.host = det._host;
ed.datacenter = det._datacenter;
if (det._rack != "") {
ed.rack = det._rack;
}
r.endpoint_details.push(ed);
}
res.push_back(r);
}
return res;
}
void set_storage_service(http_context& ctx, routes& r) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
@@ -125,12 +148,13 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(res);
});
ss::describe_ring_jmx.set(r, [&ctx](std::unique_ptr<request> req) {
//TBD
unimplemented();
auto keyspace = validate_keyspace(ctx, req->param);
std::vector<sstring> res;
return make_ready_future<json::json_return_type>(res);
ss::describe_any_ring.set(r, [&ctx](const_req req) {
return describe_ring("");
});
ss::describe_ring.set(r, [&ctx](const_req req) {
auto keyspace = validate_keyspace(ctx, req.param);
return describe_ring(keyspace);
});
ss::get_host_id_map.set(r, [](const_req req) {

39
dht/token_range.hh Normal file
View File

@@ -0,0 +1,39 @@
/*
* Copyright (C) 2015 ScyllaDB.
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General License for more details.
*
* You should have received a copy of the GNU General License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
#include <vector>
namespace dht {
struct endpoint_details {
sstring _host;
sstring _datacenter;
sstring _rack;
};
struct token_range {
sstring _start_token;
sstring _end_token;
std::vector<sstring> _endpoints;
std::vector<sstring> _rpc_endpoints;
std::vector<endpoint_details> _endpoint_details;
};
}

View File

@@ -47,6 +47,7 @@
#include "core/distributed.hh"
#include "dht/i_partitioner.hh"
#include "dht/boot_strapper.hh"
#include "dht/token_range.hh"
#include "core/sleep.hh"
#include "gms/application_state.hh"
#include "db/system_keyspace.hh"
@@ -92,6 +93,8 @@ public:
};
private:
using token = dht::token;
using token_range = dht::token_range;
using endpoint_details = dht::endpoint_details;
using boot_strapper = dht::boot_strapper;
using token_metadata = locator::token_metadata;
using application_state = gms::application_state;
@@ -146,7 +149,7 @@ public:
};
private:
bool is_auto_bootstrap();
inet_address get_broadcast_address() {
inet_address get_broadcast_address() const {
return utils::fb_utilities::get_broadcast_address();
}
static int get_ring_delay() {
@@ -448,22 +451,23 @@ public:
}
return map;
}
#endif
/**
* Return the rpc address associated with an endpoint as a string.
* @param endpoint The endpoint to get rpc address for
* @return the rpc address
*/
public String getRpcaddress(InetAddress endpoint)
{
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
return DatabaseDescriptor.getBroadcastRpcAddress().getHostAddress();
else if (Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS) == null)
return endpoint.getHostAddress();
else
return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RPC_ADDRESS).value;
sstring get_rpc_address(const inet_address& endpoint) const {
if (endpoint != get_broadcast_address()) {
auto v = gms::get_local_gossiper().get_endpoint_state_for_endpoint(endpoint)->get_application_state(gms::application_state::RPC_ADDRESS);
if (v) {
return boost::lexical_cast<std::string>(v.value());
}
}
return boost::lexical_cast<std::string>(endpoint);
}
#if 0
/**
* for a keyspace, return the ranges and corresponding RPC addresses for a given keyspace.
* @param keyspace
@@ -500,64 +504,55 @@ public:
}
return map;
}
public Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace)
{
return getRangeToAddressMap(keyspace, _token_metadata.sortedTokens());
#endif
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace) const {
return get_range_to_address_map(keyspace, _token_metadata.sorted_tokens());
}
public Map<Range<Token>, List<InetAddress>> getRangeToAddressMapInLocalDC(String keyspace)
{
Predicate<InetAddress> isLocalDC = new Predicate<InetAddress>()
{
public boolean apply(InetAddress address)
{
return isLocalDC(address);
}
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map_in_local_dc(
const sstring& keyspace) const {
std::function<bool(const inet_address&)> filter = [this](const inet_address& address) {
return is_local_dc(address);
};
Map<Range<Token>, List<InetAddress>> origMap = getRangeToAddressMap(keyspace, getTokensInLocalDC());
Map<Range<Token>, List<InetAddress>> filteredMap = Maps.newHashMap();
for (Map.Entry<Range<Token>, List<InetAddress>> entry : origMap.entrySet())
{
List<InetAddress> endpointsInLocalDC = Lists.newArrayList(Collections2.filter(entry.getValue(), isLocalDC));
filteredMap.put(entry.getKey(), endpointsInLocalDC);
auto orig_map = get_range_to_address_map(keyspace, get_tokens_in_local_dc());
std::unordered_map<range<token>, std::vector<inet_address>> filtered_map;
for (auto entry : orig_map) {
filtered_map[entry.first].reserve(entry.second.size());
std::remove_copy_if(entry.second.begin(), entry.second.end(),
filtered_map[entry.first].begin(), filter);
}
return filteredMap;
return filtered_map;
}
private List<Token> getTokensInLocalDC()
{
List<Token> filteredTokens = Lists.newArrayList();
for (Token token : _token_metadata.sortedTokens())
{
InetAddress endpoint = _token_metadata.getEndpoint(token);
if (isLocalDC(endpoint))
filteredTokens.add(token);
std::vector<token> get_tokens_in_local_dc() const {
std::vector<token> filtered_tokens;
for (auto token : _token_metadata.sorted_tokens()) {
auto endpoint = _token_metadata.get_endpoint(token);
if (is_local_dc(*endpoint))
filtered_tokens.push_back(token);
}
return filteredTokens;
return filtered_tokens;
}
private boolean isLocalDC(InetAddress targetHost)
{
String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost);
String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
return remoteDC.equals(localDC);
bool is_local_dc(const inet_address& targetHost) const {
auto remote_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(targetHost);
auto local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(get_broadcast_address());
return remote_dc == local_dc;
}
private Map<Range<Token>, List<InetAddress>> getRangeToAddressMap(String keyspace, List<Token> sortedTokens)
{
std::unordered_map<range<token>, std::vector<inet_address>> get_range_to_address_map(const sstring& keyspace,
const std::vector<token>& sorted_tokens) const {
// some people just want to get a visual representation of things. Allow null and set it to the first
// non-system keyspace.
if (keyspace == null)
keyspace = Schema.instance.getNonSystemKeyspaces().get(0);
List<Range<Token>> ranges = getAllRanges(sortedTokens);
return constructRangeToEndpointMap(keyspace, ranges);
if (keyspace == "" && _db.local().get_non_system_keyspaces().empty()) {
throw std::runtime_error("No keyspace provided and no non system kespace exist");
}
const sstring& ks = (keyspace == "") ? _db.local().get_non_system_keyspaces()[0] : keyspace;
return construct_range_to_endpoint_map(ks, get_all_ranges(sorted_tokens));
}
/**
* The same as {@code describeRing(String)} but converts TokenRange to the String for JMX compatibility
*
@@ -565,38 +560,15 @@ public:
*
* @return a List of TokenRange(s) converted to String for the given keyspace
*/
public List<String> describeRingJMX(String keyspace) throws IOException
{
List<TokenRange> tokenRanges;
try
{
tokenRanges = describeRing(keyspace);
}
catch (InvalidRequestException e)
{
throw new IOException(e.getMessage());
}
List<String> result = new ArrayList<>(tokenRanges.size());
for (TokenRange tokenRange : tokenRanges)
result.add(tokenRange.toString());
return result;
}
/**
* The TokenRange for a given keyspace.
*
* @param keyspace The keyspace to fetch information about
*
* @return a List of TokenRange(s) for the given keyspace
*
* @throws InvalidRequestException if there is no ring information available about keyspace
/*
* describeRingJMX will be implemented in the API
* It is left here just as a marker that there is no need to implement it
* here
*/
public List<TokenRange> describeRing(String keyspace) throws InvalidRequestException
{
return describeRing(keyspace, false);
}
//std::vector<sstring> describeRingJMX(const sstring& keyspace) const {
#if 0
/**
* The same as {@code describeRing(String)} but considers only the part of the ring formed by nodes in the local DC.
@@ -605,54 +577,34 @@ public:
{
return describeRing(keyspace, true);
}
#endif
std::vector<token_range> describe_ring(const sstring& keyspace, bool include_only_local_dc = false) const {
std::vector<token_range> ranges;
//Token.TokenFactory tf = getPartitioner().getTokenFactory();
private List<TokenRange> describeRing(String keyspace, boolean includeOnlyLocalDC) throws InvalidRequestException
{
if (!Schema.instance.getKeyspaces().contains(keyspace))
throw new InvalidRequestException("No such keyspace: " + keyspace);
if (keyspace == null || Keyspace.open(keyspace).getReplicationStrategy() instanceof LocalStrategy)
throw new InvalidRequestException("There is no ring for the keyspace: " + keyspace);
List<TokenRange> ranges = new ArrayList<>();
Token.TokenFactory tf = getPartitioner().getTokenFactory();
Map<Range<Token>, List<InetAddress>> rangeToAddressMap =
includeOnlyLocalDC
? getRangeToAddressMapInLocalDC(keyspace)
: getRangeToAddressMap(keyspace);
for (Map.Entry<Range<Token>, List<InetAddress>> entry : rangeToAddressMap.entrySet())
{
Range range = entry.getKey();
List<InetAddress> addresses = entry.getValue();
List<String> endpoints = new ArrayList<>(addresses.size());
List<String> rpc_endpoints = new ArrayList<>(addresses.size());
List<EndpointDetails> epDetails = new ArrayList<>(addresses.size());
for (InetAddress endpoint : addresses)
{
EndpointDetails details = new EndpointDetails();
details.host = endpoint.getHostAddress();
details.datacenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint);
details.rack = DatabaseDescriptor.getEndpointSnitch().getRack(endpoint);
endpoints.add(details.host);
rpc_endpoints.add(getRpcaddress(endpoint));
epDetails.add(details);
std::unordered_map<range<token>, std::vector<inet_address>> range_to_address_map =
include_only_local_dc
? get_range_to_address_map_in_local_dc(keyspace)
: get_range_to_address_map(keyspace);
for (auto entry : range_to_address_map) {
auto range = entry.first;
auto addresses = entry.second;
token_range tr;
for (auto endpoint : addresses) {
endpoint_details details;
details._host = boost::lexical_cast<std::string>(endpoint);
details._datacenter = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(endpoint);
details._rack = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_rack(endpoint);
tr._rpc_endpoints.push_back(get_rpc_address(endpoint));
tr._endpoints.push_back(details._host);
tr._endpoint_details.push_back(details);
}
TokenRange tr = new TokenRange(tf.toString(range.left.getToken()), tf.toString(range.right.getToken()), endpoints)
.setEndpoint_details(epDetails)
.setRpc_endpoints(rpc_endpoints);
ranges.add(tr);
ranges.push_back(tr);
}
return ranges;
}
#if 0
public Map<String, String> getTokenToEndpointMap()
{
Map<Token, InetAddress> mapInetAddress = _token_metadata.getNormalAndBootstrappingTokenToEndpointMap();
@@ -679,23 +631,23 @@ public:
mapOut.put(entry.getKey().getHostAddress(), entry.getValue().toString());
return mapOut;
}
#endif
/**
* Construct the range to endpoint mapping based on the true view
* of the world.
* @param ranges
* @return mapping of ranges to the replicas responsible for them.
*/
private Map<Range<Token>, List<InetAddress>> constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
{
Map<Range<Token>, List<InetAddress>> rangeToEndpointMap = new HashMap<>(ranges.size());
for (Range<Token> range : ranges)
{
rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalEndpoints(range.right));
std::unordered_map<range<token>, std::vector<inet_address>> construct_range_to_endpoint_map(
const sstring& keyspace,
const std::vector<range<token>>& ranges) const {
std::unordered_map<range<token>, std::vector<inet_address>> res;
for (auto r : ranges) {
res[r] = _db.local().find_keyspace(keyspace).get_replication_strategy().get_natural_endpoints(r.end()->value());
}
return rangeToEndpointMap;
return res;
}
#endif
public:
virtual void on_join(gms::inet_address endpoint, gms::endpoint_state ep_state) override;
virtual void before_change(gms::inet_address endpoint, gms::endpoint_state current_state, gms::application_state new_state_key, gms::versioned_value new_value) override;
@@ -1682,33 +1634,29 @@ public:
return _db.local().find_keyspace(name).get_replication_strategy().get_ranges(ep);
}
#if 0
/**
* Get all ranges that span the ring given a set
* of tokens. All ranges are in sorted order of
* ranges.
* @return ranges in sorted order
*/
public List<Range<Token>> getAllRanges(List<Token> sortedTokens)
{
if (logger.isDebugEnabled())
logger.debug("computing ranges for {}", StringUtils.join(sortedTokens, ", "));
std::vector<range<token>> get_all_ranges(const std::vector<token>& sorted_tokens) const{
if (sortedTokens.isEmpty())
return Collections.emptyList();
int size = sortedTokens.size();
List<Range<Token>> ranges = new ArrayList<>(size + 1);
for (int i = 1; i < size; ++i)
{
Range<Token> range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i));
ranges.add(range);
if (sorted_tokens.empty())
return std::vector<range<token>>();
int size = sorted_tokens.size();
std::vector<range<token>> ranges;
for (int i = 1; i < size; ++i) {
range<token> r(range<token>::bound(sorted_tokens[i - 1], false), range<token>::bound(sorted_tokens[i], true));
ranges.push_back(r);
}
Range<Token> range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0));
ranges.add(range);
range<token> r(range<token>::bound(sorted_tokens[size - 1], false),
range<token>::bound(sorted_tokens[0], true));
ranges.push_back(r);
return ranges;
}
#if 0
/**
* This method returns the N endpoints that are responsible for storing the
* specified key i.e for replication.