column_family: Make toppartitions queries more generic
Right now toppartitions can only be invoked on one column family at a time.
This change introduces a natural extension to this functionality,
allowing to specify a list of families.
We provide three ways for filtering in the query parameter "name_list":
1. A specific column family to include in the form "ks:cf"
2. A keyspace, telling the server to include all column families in it.
Specified by omitting the cf name, i.e. "ks:"
3. All column families, which is represented by an empty list
The list can include any amount of one or both of the 1. and 2. option.
Fixes #4520
Closes #7864
This commit is contained in:
committed by
Avi Kivity
parent
bcbb39999b
commit
c1daf2bb24
@@ -104,6 +104,68 @@
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/toppartitions/",
|
||||
"operations":[
|
||||
{
|
||||
"method":"GET",
|
||||
"summary":"Toppartitions query",
|
||||
"type":"toppartitions_query_results",
|
||||
"nickname":"toppartitions_generic",
|
||||
"produces":[
|
||||
"application/json"
|
||||
],
|
||||
"parameters":[
|
||||
{
|
||||
"name":"table_filters",
|
||||
"description":"Optional list of table name filters in keyspace:name format",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"string"
|
||||
},
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"keyspace_filters",
|
||||
"description":"Optional list of keyspace filters",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type":"array",
|
||||
"items":{
|
||||
"type":"string"
|
||||
},
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"duration",
|
||||
"description":"Duration (in milliseconds) of monitoring operation",
|
||||
"required":true,
|
||||
"allowMultiple":false,
|
||||
"type": "long",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"list_size",
|
||||
"description":"number of the top partitions to list",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type": "long",
|
||||
"paramType":"query"
|
||||
},
|
||||
{
|
||||
"name":"capacity",
|
||||
"description":"capacity of stream summary: determines amount of resources used in query processing",
|
||||
"required":false,
|
||||
"allowMultiple":false,
|
||||
"type": "long",
|
||||
"paramType":"query"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"path":"/storage_service/nodes/leaving",
|
||||
"operations":[
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <algorithm>
|
||||
#include "db/system_keyspace_view_types.hh"
|
||||
#include "db/data_listeners.hh"
|
||||
#include "storage_service.hh"
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
@@ -983,45 +984,20 @@ void set_column_family(http_context& ctx, routes& r) {
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
cf::toppartitions.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
auto name_param = req->param["name"];
|
||||
auto [ks, cf] = parse_fully_qualified_cf_name(name_param);
|
||||
auto name = req->param["name"];
|
||||
auto [ks, cf] = parse_fully_qualified_cf_name(name);
|
||||
|
||||
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
|
||||
api::req_param<unsigned> capacity(*req, "capacity", 256);
|
||||
api::req_param<unsigned> list_size(*req, "list_size", 10);
|
||||
|
||||
apilog.info("toppartitions query: name={} duration={} list_size={} capacity={}",
|
||||
name_param, duration.param, list_size.param, capacity.param);
|
||||
name, duration.param, list_size.param, capacity.param);
|
||||
|
||||
return seastar::do_with(db::toppartitions_query(ctx.db, ks, cf, duration.value, list_size, capacity), [&ctx](auto& q) {
|
||||
return q.scatter().then([&q] {
|
||||
return sleep(q.duration()).then([&q] {
|
||||
return q.gather(q.capacity()).then([&q] (auto topk_results) {
|
||||
apilog.debug("toppartitions query: processing results");
|
||||
cf::toppartitions_query_results results;
|
||||
|
||||
results.read_cardinality = topk_results.read.size();
|
||||
results.write_cardinality = topk_results.write.size();
|
||||
|
||||
for (auto& d: topk_results.read.top(q.list_size())) {
|
||||
cf::toppartitions_record r;
|
||||
r.partition = sstring(d.item);
|
||||
r.count = d.count;
|
||||
r.error = d.error;
|
||||
results.read.push(r);
|
||||
}
|
||||
for (auto& d: topk_results.write.top(q.list_size())) {
|
||||
cf::toppartitions_record r;
|
||||
r.partition = sstring(d.item);
|
||||
r.count = d.count;
|
||||
r.error = d.error;
|
||||
results.write.push(r);
|
||||
}
|
||||
return make_ready_future<json::json_return_type>(results);
|
||||
});
|
||||
});
|
||||
});
|
||||
return seastar::do_with(db::toppartitions_query(ctx.db, {{ks, cf}}, {}, duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) {
|
||||
return run_toppartitions_query(q, ctx, true);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -116,4 +116,7 @@ future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& n
|
||||
future<json::json_return_type> get_cf_stats(http_context& ctx,
|
||||
int64_t column_family_stats::*f);
|
||||
|
||||
|
||||
std::tuple<sstring, sstring> parse_fully_qualified_cf_name(sstring name);
|
||||
|
||||
}
|
||||
|
||||
@@ -23,11 +23,13 @@
|
||||
#include "api/api-doc/storage_service.json.hh"
|
||||
#include "db/config.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include <optional>
|
||||
#include "utils/hash.hh"
|
||||
#include <sstream>
|
||||
#include <time.h>
|
||||
#include <boost/range/adaptor/map.hpp>
|
||||
#include <boost/range/adaptor/filtered.hpp>
|
||||
#include <boost/algorithm/string/trim_all.hpp>
|
||||
#include <boost/functional/hash.hpp>
|
||||
#include "service/storage_service.hh"
|
||||
#include "service/load_meter.hh"
|
||||
#include "db/commitlog/commitlog.hh"
|
||||
@@ -49,6 +51,8 @@
|
||||
#include "locator/token_metadata.hh"
|
||||
#include "cdc/generation_service.hh"
|
||||
|
||||
extern logging::logger apilog;
|
||||
|
||||
namespace api {
|
||||
|
||||
const locator::token_metadata& http_context::get_token_metadata() {
|
||||
@@ -96,6 +100,37 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
|
||||
};
|
||||
}
|
||||
|
||||
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request) {
|
||||
namespace cf = httpd::column_family_json;
|
||||
return q.scatter().then([&q, legacy_request] {
|
||||
return sleep(q.duration()).then([&q, legacy_request] {
|
||||
return q.gather(q.capacity()).then([&q, legacy_request] (auto topk_results) {
|
||||
apilog.debug("toppartitions query: processing results");
|
||||
cf::toppartitions_query_results results;
|
||||
|
||||
results.read_cardinality = topk_results.read.size();
|
||||
results.write_cardinality = topk_results.write.size();
|
||||
|
||||
for (auto& d: topk_results.read.top(q.list_size())) {
|
||||
cf::toppartitions_record r;
|
||||
r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item);
|
||||
r.count = d.count;
|
||||
r.error = d.error;
|
||||
results.read.push(r);
|
||||
}
|
||||
for (auto& d: topk_results.write.top(q.list_size())) {
|
||||
cf::toppartitions_record r;
|
||||
r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item);
|
||||
r.count = d.count;
|
||||
r.error = d.error;
|
||||
results.write.push(r);
|
||||
}
|
||||
return make_ready_future<json::json_return_type>(results);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
|
||||
if (tables.empty()) {
|
||||
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
|
||||
@@ -289,6 +324,56 @@ void set_storage_service(http_context& ctx, routes& r) {
|
||||
}));
|
||||
});
|
||||
|
||||
ss::toppartitions_generic.set(r, [&ctx] (std::unique_ptr<request> req) {
|
||||
bool filters_provided = false;
|
||||
|
||||
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters {};
|
||||
if (req->query_parameters.contains("table_filters")) {
|
||||
filters_provided = true;
|
||||
auto filters = req->get_query_param("table_filters");
|
||||
std::stringstream ss { filters };
|
||||
std::string filter;
|
||||
while (!filters.empty() && ss.good()) {
|
||||
std::getline(ss, filter, ',');
|
||||
table_filters.emplace(parse_fully_qualified_cf_name(filter));
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<sstring> keyspace_filters {};
|
||||
if (req->query_parameters.contains("keyspace_filters")) {
|
||||
filters_provided = true;
|
||||
auto filters = req->get_query_param("keyspace_filters");
|
||||
std::stringstream ss { filters };
|
||||
std::string filter;
|
||||
while (!filters.empty() && ss.good()) {
|
||||
std::getline(ss, filter, ',');
|
||||
keyspace_filters.emplace(std::move(filter));
|
||||
}
|
||||
}
|
||||
|
||||
// when the query is empty return immediately
|
||||
if (filters_provided && table_filters.empty() && keyspace_filters.empty()) {
|
||||
apilog.debug("toppartitions query: processing results");
|
||||
httpd::column_family_json::toppartitions_query_results results;
|
||||
|
||||
results.read_cardinality = 0;
|
||||
results.write_cardinality = 0;
|
||||
|
||||
return make_ready_future<json::json_return_type>(results);
|
||||
}
|
||||
|
||||
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
|
||||
api::req_param<unsigned> capacity(*req, "capacity", 256);
|
||||
api::req_param<unsigned> list_size(*req, "list_size", 10);
|
||||
|
||||
apilog.info("toppartitions query: #table_filters={} #keyspace_filters={} duration={} list_size={} capacity={}",
|
||||
!table_filters.empty() ? std::to_string(table_filters.size()) : "all", !keyspace_filters.empty() ? std::to_string(keyspace_filters.size()) : "all", duration.param, list_size.param, capacity.param);
|
||||
|
||||
return seastar::do_with(db::toppartitions_query(ctx.db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) {
|
||||
return run_toppartitions_query(q, ctx);
|
||||
});
|
||||
});
|
||||
|
||||
ss::get_leaving_nodes.set(r, [&ctx](const_req req) {
|
||||
return container_to_vec(ctx.get_token_metadata().get_leaving_endpoints());
|
||||
});
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include "api.hh"
|
||||
#include "db/data_listeners.hh"
|
||||
|
||||
namespace cql_transport { class controller; }
|
||||
class thrift_controller;
|
||||
@@ -40,5 +41,6 @@ void set_rpc_controller(http_context& ctx, routes& r, thrift_controller& ctl);
|
||||
void unset_rpc_controller(http_context& ctx, routes& r);
|
||||
void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl);
|
||||
void unset_snapshot(http_context& ctx, routes& r);
|
||||
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request = false);
|
||||
|
||||
}
|
||||
|
||||
@@ -63,7 +63,8 @@ toppartitions_item_key::operator sstring() const {
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
toppartitions_data_listener::toppartitions_data_listener(database& db, sstring ks, sstring cf) : _db(db), _ks(ks), _cf(cf) {
|
||||
toppartitions_data_listener::toppartitions_data_listener(database& db, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters,
|
||||
std::unordered_set<sstring> keyspace_filters) : _db(db), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)) {
|
||||
dblog.debug("toppartitions_data_listener: installing {}", fmt::ptr(this));
|
||||
_db.data_listeners().install(this);
|
||||
}
|
||||
@@ -80,25 +81,29 @@ future<> toppartitions_data_listener::stop() {
|
||||
|
||||
flat_mutation_reader toppartitions_data_listener::on_read(const schema_ptr& s, const dht::partition_range& range,
|
||||
const query::partition_slice& slice, flat_mutation_reader&& rd) {
|
||||
if (s->ks_name() != _ks || s->cf_name() != _cf) {
|
||||
return std::move(rd);
|
||||
bool include_all = _table_filters.empty() && _keyspace_filters.empty();
|
||||
|
||||
if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) {
|
||||
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
|
||||
return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
|
||||
// The data query may be executing after the toppartitions_data_listener object has been removed, so check
|
||||
if (zis) {
|
||||
zis->_top_k_read.append(toppartitions_item_key{s, dk});
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
|
||||
return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
|
||||
// The data query may be executing after the toppartitions_data_listener object has been removed, so check
|
||||
if (zis) {
|
||||
zis->_top_k_read.append(toppartitions_item_key{s, dk});
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
return std::move(rd);
|
||||
}
|
||||
|
||||
void toppartitions_data_listener::on_write(const schema_ptr& s, const frozen_mutation& m) {
|
||||
if (s->ks_name() != _ks || s->cf_name() != _cf) {
|
||||
return;
|
||||
bool include_all = _table_filters.empty() && _keyspace_filters.empty();
|
||||
|
||||
if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) {
|
||||
dblog.trace("toppartitions_data_listener::on_write: {}.{}", s->ks_name(), s->cf_name());
|
||||
_top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)});
|
||||
}
|
||||
dblog.trace("toppartitions_data_listener::on_write: {}.{}", _ks, _cf);
|
||||
_top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)});
|
||||
}
|
||||
|
||||
toppartitions_data_listener::global_top_k::results
|
||||
@@ -121,15 +126,16 @@ toppartitions_data_listener::localize(const global_top_k::results& r) {
|
||||
return n;
|
||||
}
|
||||
|
||||
toppartitions_query::toppartitions_query(distributed<database>& xdb, sstring ks, sstring cf,
|
||||
std::chrono::milliseconds duration, size_t list_size, size_t capacity)
|
||||
: _xdb(xdb), _ks(ks), _cf(cf), _duration(duration), _list_size(list_size), _capacity(capacity),
|
||||
toppartitions_query::toppartitions_query(distributed<database>& xdb, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash>&& table_filters,
|
||||
std::unordered_set<sstring>&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity)
|
||||
: _xdb(xdb), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)), _duration(duration), _list_size(list_size), _capacity(capacity),
|
||||
_query(std::make_unique<sharded<toppartitions_data_listener>>()) {
|
||||
dblog.debug("toppartitions_query on {}.{}", _ks, _cf);
|
||||
dblog.debug("toppartitions_query on {} column families and {} keyspaces", !_table_filters.empty() ? std::to_string(_table_filters.size()) : "all",
|
||||
!_keyspace_filters.empty() ? std::to_string(_keyspace_filters.size()) : "all");
|
||||
}
|
||||
|
||||
future<> toppartitions_query::scatter() {
|
||||
return _query->start(std::ref(_xdb), _ks, _cf);
|
||||
return _query->start(std::ref(_xdb), _table_filters, _keyspace_filters);
|
||||
}
|
||||
|
||||
using top_t = toppartitions_data_listener::global_top_k::results;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
|
||||
#include "utils/hash.hh"
|
||||
#include "schema_fwd.hh"
|
||||
#include "flat_mutation_reader.hh"
|
||||
#include "mutation_reader.hh"
|
||||
@@ -128,8 +129,8 @@ class toppartitions_data_listener : public data_listener, public weakly_referenc
|
||||
friend class toppartitions_query;
|
||||
|
||||
database& _db;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> _table_filters;
|
||||
std::unordered_set<sstring> _keyspace_filters;
|
||||
|
||||
public:
|
||||
using top_k = utils::space_saving_top_k<toppartitions_item_key, toppartitions_item_key::hash, toppartitions_item_key::comp>;
|
||||
@@ -142,7 +143,7 @@ private:
|
||||
top_k _top_k_write;
|
||||
|
||||
public:
|
||||
toppartitions_data_listener(database& db, sstring ks, sstring cf);
|
||||
toppartitions_data_listener(database& db, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters, std::unordered_set<sstring> keyspace_filters);
|
||||
~toppartitions_data_listener();
|
||||
|
||||
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
|
||||
@@ -155,16 +156,16 @@ public:
|
||||
|
||||
class toppartitions_query {
|
||||
distributed<database>& _xdb;
|
||||
sstring _ks;
|
||||
sstring _cf;
|
||||
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> _table_filters;
|
||||
std::unordered_set<sstring> _keyspace_filters;
|
||||
std::chrono::milliseconds _duration;
|
||||
size_t _list_size;
|
||||
size_t _capacity;
|
||||
std::unique_ptr<sharded<toppartitions_data_listener>> _query;
|
||||
|
||||
public:
|
||||
toppartitions_query(seastar::distributed<database>& xdb, sstring ks, sstring cf,
|
||||
std::chrono::milliseconds duration, size_t list_size, size_t capacity);
|
||||
toppartitions_query(seastar::distributed<database>& xdb, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash>&& table_filters,
|
||||
std::unordered_set<sstring>&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity);
|
||||
|
||||
struct results {
|
||||
toppartitions_data_listener::top_k read;
|
||||
|
||||
@@ -458,7 +458,7 @@ SEASTAR_TEST_CASE(clear_nonexistent_snapshot) {
|
||||
SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) {
|
||||
return do_with_cql_env_thread([] (cql_test_env& e) {
|
||||
e.execute_cql("CREATE TABLE ks.tab (id int PRIMARY KEY)").get();
|
||||
db::toppartitions_query tq(e.db(), "ks", "tab", 1s, 100, 100);
|
||||
db::toppartitions_query tq(e.db(), {{"ks", "tab"}}, {}, 1s, 100, 100);
|
||||
tq.scatter().get();
|
||||
auto q = e.prepare("INSERT INTO ks.tab(id) VALUES(?)").get0();
|
||||
// Generate many values to ensure crossing shards
|
||||
|
||||
Reference in New Issue
Block a user