column_family: Make toppartitions queries more generic

Right now toppartitions can only be invoked on one column family at a time.
This change introduces a natural extension to this functionality,
allowing to specify a list of families.

We provide three ways for filtering in the query parameter "name_list":
    1. A specific column family to include in the form "ks:cf"
    2. A keyspace, telling the server to include all column families in it.
       Specified by omitting the cf name, i.e. "ks:"
    3. All column families, which is represented by an empty list
The list can include any amount of one or both of the 1. and 2. option.

Fixes #4520

Closes #7864
This commit is contained in:
Piotr Wojtczak
2021-01-04 14:15:22 +01:00
committed by Avi Kivity
parent bcbb39999b
commit c1daf2bb24
8 changed files with 195 additions and 60 deletions

View File

@@ -104,6 +104,68 @@
}
]
},
{
"path":"/storage_service/toppartitions/",
"operations":[
{
"method":"GET",
"summary":"Toppartitions query",
"type":"toppartitions_query_results",
"nickname":"toppartitions_generic",
"produces":[
"application/json"
],
"parameters":[
{
"name":"table_filters",
"description":"Optional list of table name filters in keyspace:name format",
"required":false,
"allowMultiple":false,
"type":"array",
"items":{
"type":"string"
},
"paramType":"query"
},
{
"name":"keyspace_filters",
"description":"Optional list of keyspace filters",
"required":false,
"allowMultiple":false,
"type":"array",
"items":{
"type":"string"
},
"paramType":"query"
},
{
"name":"duration",
"description":"Duration (in milliseconds) of monitoring operation",
"required":true,
"allowMultiple":false,
"type": "long",
"paramType":"query"
},
{
"name":"list_size",
"description":"number of the top partitions to list",
"required":false,
"allowMultiple":false,
"type": "long",
"paramType":"query"
},
{
"name":"capacity",
"description":"capacity of stream summary: determines amount of resources used in query processing",
"required":false,
"allowMultiple":false,
"type": "long",
"paramType":"query"
}
]
}
]
},
{
"path":"/storage_service/nodes/leaving",
"operations":[

View File

@@ -28,6 +28,7 @@
#include <algorithm>
#include "db/system_keyspace_view_types.hh"
#include "db/data_listeners.hh"
#include "storage_service.hh"
extern logging::logger apilog;
@@ -983,45 +984,20 @@ void set_column_family(http_context& ctx, routes& r) {
});
});
cf::toppartitions.set(r, [&ctx] (std::unique_ptr<request> req) {
auto name_param = req->param["name"];
auto [ks, cf] = parse_fully_qualified_cf_name(name_param);
auto name = req->param["name"];
auto [ks, cf] = parse_fully_qualified_cf_name(name);
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
api::req_param<unsigned> capacity(*req, "capacity", 256);
api::req_param<unsigned> list_size(*req, "list_size", 10);
apilog.info("toppartitions query: name={} duration={} list_size={} capacity={}",
name_param, duration.param, list_size.param, capacity.param);
name, duration.param, list_size.param, capacity.param);
return seastar::do_with(db::toppartitions_query(ctx.db, ks, cf, duration.value, list_size, capacity), [&ctx](auto& q) {
return q.scatter().then([&q] {
return sleep(q.duration()).then([&q] {
return q.gather(q.capacity()).then([&q] (auto topk_results) {
apilog.debug("toppartitions query: processing results");
cf::toppartitions_query_results results;
results.read_cardinality = topk_results.read.size();
results.write_cardinality = topk_results.write.size();
for (auto& d: topk_results.read.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = sstring(d.item);
r.count = d.count;
r.error = d.error;
results.read.push(r);
}
for (auto& d: topk_results.write.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = sstring(d.item);
r.count = d.count;
r.error = d.error;
results.write.push(r);
}
return make_ready_future<json::json_return_type>(results);
});
});
});
return seastar::do_with(db::toppartitions_query(ctx.db, {{ks, cf}}, {}, duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) {
return run_toppartitions_query(q, ctx, true);
});
});

View File

@@ -116,4 +116,7 @@ future<json::json_return_type> get_cf_stats(http_context& ctx, const sstring& n
future<json::json_return_type> get_cf_stats(http_context& ctx,
int64_t column_family_stats::*f);
std::tuple<sstring, sstring> parse_fully_qualified_cf_name(sstring name);
}

View File

@@ -23,11 +23,13 @@
#include "api/api-doc/storage_service.json.hh"
#include "db/config.hh"
#include "db/schema_tables.hh"
#include <optional>
#include "utils/hash.hh"
#include <sstream>
#include <time.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/adaptor/filtered.hpp>
#include <boost/algorithm/string/trim_all.hpp>
#include <boost/functional/hash.hpp>
#include "service/storage_service.hh"
#include "service/load_meter.hh"
#include "db/commitlog/commitlog.hh"
@@ -49,6 +51,8 @@
#include "locator/token_metadata.hh"
#include "cdc/generation_service.hh"
extern logging::logger apilog;
namespace api {
const locator::token_metadata& http_context::get_token_metadata() {
@@ -96,6 +100,37 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) {
};
}
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request) {
namespace cf = httpd::column_family_json;
return q.scatter().then([&q, legacy_request] {
return sleep(q.duration()).then([&q, legacy_request] {
return q.gather(q.capacity()).then([&q, legacy_request] (auto topk_results) {
apilog.debug("toppartitions query: processing results");
cf::toppartitions_query_results results;
results.read_cardinality = topk_results.read.size();
results.write_cardinality = topk_results.write.size();
for (auto& d: topk_results.read.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item);
r.count = d.count;
r.error = d.error;
results.read.push(r);
}
for (auto& d: topk_results.write.top(q.list_size())) {
cf::toppartitions_record r;
r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item);
r.count = d.count;
r.error = d.error;
results.write.push(r);
}
return make_ready_future<json::json_return_type>(results);
});
});
});
}
future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (tables.empty()) {
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
@@ -289,6 +324,56 @@ void set_storage_service(http_context& ctx, routes& r) {
}));
});
ss::toppartitions_generic.set(r, [&ctx] (std::unique_ptr<request> req) {
bool filters_provided = false;
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters {};
if (req->query_parameters.contains("table_filters")) {
filters_provided = true;
auto filters = req->get_query_param("table_filters");
std::stringstream ss { filters };
std::string filter;
while (!filters.empty() && ss.good()) {
std::getline(ss, filter, ',');
table_filters.emplace(parse_fully_qualified_cf_name(filter));
}
}
std::unordered_set<sstring> keyspace_filters {};
if (req->query_parameters.contains("keyspace_filters")) {
filters_provided = true;
auto filters = req->get_query_param("keyspace_filters");
std::stringstream ss { filters };
std::string filter;
while (!filters.empty() && ss.good()) {
std::getline(ss, filter, ',');
keyspace_filters.emplace(std::move(filter));
}
}
// when the query is empty return immediately
if (filters_provided && table_filters.empty() && keyspace_filters.empty()) {
apilog.debug("toppartitions query: processing results");
httpd::column_family_json::toppartitions_query_results results;
results.read_cardinality = 0;
results.write_cardinality = 0;
return make_ready_future<json::json_return_type>(results);
}
api::req_param<std::chrono::milliseconds, unsigned> duration{*req, "duration", 1000ms};
api::req_param<unsigned> capacity(*req, "capacity", 256);
api::req_param<unsigned> list_size(*req, "list_size", 10);
apilog.info("toppartitions query: #table_filters={} #keyspace_filters={} duration={} list_size={} capacity={}",
!table_filters.empty() ? std::to_string(table_filters.size()) : "all", !keyspace_filters.empty() ? std::to_string(keyspace_filters.size()) : "all", duration.param, list_size.param, capacity.param);
return seastar::do_with(db::toppartitions_query(ctx.db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) {
return run_toppartitions_query(q, ctx);
});
});
ss::get_leaving_nodes.set(r, [&ctx](const_req req) {
return container_to_vec(ctx.get_token_metadata().get_leaving_endpoints());
});

View File

@@ -23,6 +23,7 @@
#include <seastar/core/sharded.hh>
#include "api.hh"
#include "db/data_listeners.hh"
namespace cql_transport { class controller; }
class thrift_controller;
@@ -40,5 +41,6 @@ void set_rpc_controller(http_context& ctx, routes& r, thrift_controller& ctl);
void unset_rpc_controller(http_context& ctx, routes& r);
void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl);
void unset_snapshot(http_context& ctx, routes& r);
seastar::future<json::json_return_type> run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request = false);
}

View File

@@ -63,7 +63,8 @@ toppartitions_item_key::operator sstring() const {
return oss.str();
}
toppartitions_data_listener::toppartitions_data_listener(database& db, sstring ks, sstring cf) : _db(db), _ks(ks), _cf(cf) {
toppartitions_data_listener::toppartitions_data_listener(database& db, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters,
std::unordered_set<sstring> keyspace_filters) : _db(db), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)) {
dblog.debug("toppartitions_data_listener: installing {}", fmt::ptr(this));
_db.data_listeners().install(this);
}
@@ -80,25 +81,29 @@ future<> toppartitions_data_listener::stop() {
flat_mutation_reader toppartitions_data_listener::on_read(const schema_ptr& s, const dht::partition_range& range,
const query::partition_slice& slice, flat_mutation_reader&& rd) {
if (s->ks_name() != _ks || s->cf_name() != _cf) {
return std::move(rd);
bool include_all = _table_filters.empty() && _keyspace_filters.empty();
if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) {
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
// The data query may be executing after the toppartitions_data_listener object has been removed, so check
if (zis) {
zis->_top_k_read.append(toppartitions_item_key{s, dk});
}
return true;
});
}
dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name());
return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) {
// The data query may be executing after the toppartitions_data_listener object has been removed, so check
if (zis) {
zis->_top_k_read.append(toppartitions_item_key{s, dk});
}
return true;
});
return std::move(rd);
}
void toppartitions_data_listener::on_write(const schema_ptr& s, const frozen_mutation& m) {
if (s->ks_name() != _ks || s->cf_name() != _cf) {
return;
bool include_all = _table_filters.empty() && _keyspace_filters.empty();
if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) {
dblog.trace("toppartitions_data_listener::on_write: {}.{}", s->ks_name(), s->cf_name());
_top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)});
}
dblog.trace("toppartitions_data_listener::on_write: {}.{}", _ks, _cf);
_top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)});
}
toppartitions_data_listener::global_top_k::results
@@ -121,15 +126,16 @@ toppartitions_data_listener::localize(const global_top_k::results& r) {
return n;
}
toppartitions_query::toppartitions_query(distributed<database>& xdb, sstring ks, sstring cf,
std::chrono::milliseconds duration, size_t list_size, size_t capacity)
: _xdb(xdb), _ks(ks), _cf(cf), _duration(duration), _list_size(list_size), _capacity(capacity),
toppartitions_query::toppartitions_query(distributed<database>& xdb, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash>&& table_filters,
std::unordered_set<sstring>&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity)
: _xdb(xdb), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)), _duration(duration), _list_size(list_size), _capacity(capacity),
_query(std::make_unique<sharded<toppartitions_data_listener>>()) {
dblog.debug("toppartitions_query on {}.{}", _ks, _cf);
dblog.debug("toppartitions_query on {} column families and {} keyspaces", !_table_filters.empty() ? std::to_string(_table_filters.size()) : "all",
!_keyspace_filters.empty() ? std::to_string(_keyspace_filters.size()) : "all");
}
future<> toppartitions_query::scatter() {
return _query->start(std::ref(_xdb), _ks, _cf);
return _query->start(std::ref(_xdb), _table_filters, _keyspace_filters);
}
using top_t = toppartitions_data_listener::global_top_k::results;

View File

@@ -26,6 +26,7 @@
#include <seastar/core/distributed.hh>
#include <seastar/core/weak_ptr.hh>
#include "utils/hash.hh"
#include "schema_fwd.hh"
#include "flat_mutation_reader.hh"
#include "mutation_reader.hh"
@@ -128,8 +129,8 @@ class toppartitions_data_listener : public data_listener, public weakly_referenc
friend class toppartitions_query;
database& _db;
sstring _ks;
sstring _cf;
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> _table_filters;
std::unordered_set<sstring> _keyspace_filters;
public:
using top_k = utils::space_saving_top_k<toppartitions_item_key, toppartitions_item_key::hash, toppartitions_item_key::comp>;
@@ -142,7 +143,7 @@ private:
top_k _top_k_write;
public:
toppartitions_data_listener(database& db, sstring ks, sstring cf);
toppartitions_data_listener(database& db, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> table_filters, std::unordered_set<sstring> keyspace_filters);
~toppartitions_data_listener();
virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range,
@@ -155,16 +156,16 @@ public:
class toppartitions_query {
distributed<database>& _xdb;
sstring _ks;
sstring _cf;
std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash> _table_filters;
std::unordered_set<sstring> _keyspace_filters;
std::chrono::milliseconds _duration;
size_t _list_size;
size_t _capacity;
std::unique_ptr<sharded<toppartitions_data_listener>> _query;
public:
toppartitions_query(seastar::distributed<database>& xdb, sstring ks, sstring cf,
std::chrono::milliseconds duration, size_t list_size, size_t capacity);
toppartitions_query(seastar::distributed<database>& xdb, std::unordered_set<std::tuple<sstring, sstring>, utils::tuple_hash>&& table_filters,
std::unordered_set<sstring>&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity);
struct results {
toppartitions_data_listener::top_k read;

View File

@@ -458,7 +458,7 @@ SEASTAR_TEST_CASE(clear_nonexistent_snapshot) {
SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) {
return do_with_cql_env_thread([] (cql_test_env& e) {
e.execute_cql("CREATE TABLE ks.tab (id int PRIMARY KEY)").get();
db::toppartitions_query tq(e.db(), "ks", "tab", 1s, 100, 100);
db::toppartitions_query tq(e.db(), {{"ks", "tab"}}, {}, 1s, 100, 100);
tq.scatter().get();
auto q = e.prepare("INSERT INTO ks.tab(id) VALUES(?)").get0();
// Generate many values to ensure crossing shards