diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index 0a083e1cb8..5ee4ae9a04 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -104,6 +104,68 @@ } ] }, + { + "path":"/storage_service/toppartitions/", + "operations":[ + { + "method":"GET", + "summary":"Toppartitions query", + "type":"toppartitions_query_results", + "nickname":"toppartitions_generic", + "produces":[ + "application/json" + ], + "parameters":[ + { + "name":"table_filters", + "description":"Optional list of table name filters in keyspace:name format", + "required":false, + "allowMultiple":false, + "type":"array", + "items":{ + "type":"string" + }, + "paramType":"query" + }, + { + "name":"keyspace_filters", + "description":"Optional list of keyspace filters", + "required":false, + "allowMultiple":false, + "type":"array", + "items":{ + "type":"string" + }, + "paramType":"query" + }, + { + "name":"duration", + "description":"Duration (in milliseconds) of monitoring operation", + "required":true, + "allowMultiple":false, + "type": "long", + "paramType":"query" + }, + { + "name":"list_size", + "description":"number of the top partitions to list", + "required":false, + "allowMultiple":false, + "type": "long", + "paramType":"query" + }, + { + "name":"capacity", + "description":"capacity of stream summary: determines amount of resources used in query processing", + "required":false, + "allowMultiple":false, + "type": "long", + "paramType":"query" + } + ] + } + ] + }, { "path":"/storage_service/nodes/leaving", "operations":[ diff --git a/api/column_family.cc b/api/column_family.cc index f6ef7ca670..cda722dd98 100644 --- a/api/column_family.cc +++ b/api/column_family.cc @@ -28,6 +28,7 @@ #include #include "db/system_keyspace_view_types.hh" #include "db/data_listeners.hh" +#include "storage_service.hh" extern logging::logger apilog; @@ -983,45 +984,20 @@ void set_column_family(http_context& ctx, routes& r) { }); }); + cf::toppartitions.set(r, [&ctx] (std::unique_ptr req) { - auto name_param = req->param["name"]; - auto [ks, cf] = parse_fully_qualified_cf_name(name_param); + auto name = req->param["name"]; + auto [ks, cf] = parse_fully_qualified_cf_name(name); api::req_param duration{*req, "duration", 1000ms}; api::req_param capacity(*req, "capacity", 256); api::req_param list_size(*req, "list_size", 10); apilog.info("toppartitions query: name={} duration={} list_size={} capacity={}", - name_param, duration.param, list_size.param, capacity.param); + name, duration.param, list_size.param, capacity.param); - return seastar::do_with(db::toppartitions_query(ctx.db, ks, cf, duration.value, list_size, capacity), [&ctx](auto& q) { - return q.scatter().then([&q] { - return sleep(q.duration()).then([&q] { - return q.gather(q.capacity()).then([&q] (auto topk_results) { - apilog.debug("toppartitions query: processing results"); - cf::toppartitions_query_results results; - - results.read_cardinality = topk_results.read.size(); - results.write_cardinality = topk_results.write.size(); - - for (auto& d: topk_results.read.top(q.list_size())) { - cf::toppartitions_record r; - r.partition = sstring(d.item); - r.count = d.count; - r.error = d.error; - results.read.push(r); - } - for (auto& d: topk_results.write.top(q.list_size())) { - cf::toppartitions_record r; - r.partition = sstring(d.item); - r.count = d.count; - r.error = d.error; - results.write.push(r); - } - return make_ready_future(results); - }); - }); - }); + return seastar::do_with(db::toppartitions_query(ctx.db, {{ks, cf}}, {}, duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) { + return run_toppartitions_query(q, ctx, true); }); }); diff --git a/api/column_family.hh b/api/column_family.hh index e9230d4453..0cffea80a3 100644 --- a/api/column_family.hh +++ b/api/column_family.hh @@ -116,4 +116,7 @@ future get_cf_stats(http_context& ctx, const sstring& n future get_cf_stats(http_context& ctx, int64_t column_family_stats::*f); + +std::tuple parse_fully_qualified_cf_name(sstring name); + } diff --git a/api/storage_service.cc b/api/storage_service.cc index d9697ce218..38efac84fd 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -23,11 +23,13 @@ #include "api/api-doc/storage_service.json.hh" #include "db/config.hh" #include "db/schema_tables.hh" -#include +#include "utils/hash.hh" +#include #include #include #include #include +#include #include "service/storage_service.hh" #include "service/load_meter.hh" #include "db/commitlog/commitlog.hh" @@ -49,6 +51,8 @@ #include "locator/token_metadata.hh" #include "cdc/generation_service.hh" +extern logging::logger apilog; + namespace api { const locator::token_metadata& http_context::get_token_metadata() { @@ -96,6 +100,37 @@ static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { }; } +seastar::future run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request) { + namespace cf = httpd::column_family_json; + return q.scatter().then([&q, legacy_request] { + return sleep(q.duration()).then([&q, legacy_request] { + return q.gather(q.capacity()).then([&q, legacy_request] (auto topk_results) { + apilog.debug("toppartitions query: processing results"); + cf::toppartitions_query_results results; + + results.read_cardinality = topk_results.read.size(); + results.write_cardinality = topk_results.write.size(); + + for (auto& d: topk_results.read.top(q.list_size())) { + cf::toppartitions_record r; + r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item); + r.count = d.count; + r.error = d.error; + results.read.push(r); + } + for (auto& d: topk_results.write.top(q.list_size())) { + cf::toppartitions_record r; + r.partition = (legacy_request ? "" : "(" + d.item.schema->ks_name() + ":" + d.item.schema->cf_name() + ") ") + sstring(d.item); + r.count = d.count; + r.error = d.error; + results.write.push(r); + } + return make_ready_future(results); + }); + }); + }); +} + future set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector tables, bool enabled) { if (tables.empty()) { tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data()); @@ -289,6 +324,56 @@ void set_storage_service(http_context& ctx, routes& r) { })); }); + ss::toppartitions_generic.set(r, [&ctx] (std::unique_ptr req) { + bool filters_provided = false; + + std::unordered_set, utils::tuple_hash> table_filters {}; + if (req->query_parameters.contains("table_filters")) { + filters_provided = true; + auto filters = req->get_query_param("table_filters"); + std::stringstream ss { filters }; + std::string filter; + while (!filters.empty() && ss.good()) { + std::getline(ss, filter, ','); + table_filters.emplace(parse_fully_qualified_cf_name(filter)); + } + } + + std::unordered_set keyspace_filters {}; + if (req->query_parameters.contains("keyspace_filters")) { + filters_provided = true; + auto filters = req->get_query_param("keyspace_filters"); + std::stringstream ss { filters }; + std::string filter; + while (!filters.empty() && ss.good()) { + std::getline(ss, filter, ','); + keyspace_filters.emplace(std::move(filter)); + } + } + + // when the query is empty return immediately + if (filters_provided && table_filters.empty() && keyspace_filters.empty()) { + apilog.debug("toppartitions query: processing results"); + httpd::column_family_json::toppartitions_query_results results; + + results.read_cardinality = 0; + results.write_cardinality = 0; + + return make_ready_future(results); + } + + api::req_param duration{*req, "duration", 1000ms}; + api::req_param capacity(*req, "capacity", 256); + api::req_param list_size(*req, "list_size", 10); + + apilog.info("toppartitions query: #table_filters={} #keyspace_filters={} duration={} list_size={} capacity={}", + !table_filters.empty() ? std::to_string(table_filters.size()) : "all", !keyspace_filters.empty() ? std::to_string(keyspace_filters.size()) : "all", duration.param, list_size.param, capacity.param); + + return seastar::do_with(db::toppartitions_query(ctx.db, std::move(table_filters), std::move(keyspace_filters), duration.value, list_size, capacity), [&ctx] (db::toppartitions_query& q) { + return run_toppartitions_query(q, ctx); + }); + }); + ss::get_leaving_nodes.set(r, [&ctx](const_req req) { return container_to_vec(ctx.get_token_metadata().get_leaving_endpoints()); }); diff --git a/api/storage_service.hh b/api/storage_service.hh index e1b2ff34cc..06e0940fa1 100644 --- a/api/storage_service.hh +++ b/api/storage_service.hh @@ -23,6 +23,7 @@ #include #include "api.hh" +#include "db/data_listeners.hh" namespace cql_transport { class controller; } class thrift_controller; @@ -40,5 +41,6 @@ void set_rpc_controller(http_context& ctx, routes& r, thrift_controller& ctl); void unset_rpc_controller(http_context& ctx, routes& r); void set_snapshot(http_context& ctx, routes& r, sharded& snap_ctl); void unset_snapshot(http_context& ctx, routes& r); +seastar::future run_toppartitions_query(db::toppartitions_query& q, http_context &ctx, bool legacy_request = false); } diff --git a/db/data_listeners.cc b/db/data_listeners.cc index 60379b5468..9b1b75f422 100755 --- a/db/data_listeners.cc +++ b/db/data_listeners.cc @@ -63,7 +63,8 @@ toppartitions_item_key::operator sstring() const { return oss.str(); } -toppartitions_data_listener::toppartitions_data_listener(database& db, sstring ks, sstring cf) : _db(db), _ks(ks), _cf(cf) { +toppartitions_data_listener::toppartitions_data_listener(database& db, std::unordered_set, utils::tuple_hash> table_filters, + std::unordered_set keyspace_filters) : _db(db), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)) { dblog.debug("toppartitions_data_listener: installing {}", fmt::ptr(this)); _db.data_listeners().install(this); } @@ -80,25 +81,29 @@ future<> toppartitions_data_listener::stop() { flat_mutation_reader toppartitions_data_listener::on_read(const schema_ptr& s, const dht::partition_range& range, const query::partition_slice& slice, flat_mutation_reader&& rd) { - if (s->ks_name() != _ks || s->cf_name() != _cf) { - return std::move(rd); + bool include_all = _table_filters.empty() && _keyspace_filters.empty(); + + if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) { + dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name()); + return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) { + // The data query may be executing after the toppartitions_data_listener object has been removed, so check + if (zis) { + zis->_top_k_read.append(toppartitions_item_key{s, dk}); + } + return true; + }); } - dblog.trace("toppartitions_data_listener::on_read: {}.{}", s->ks_name(), s->cf_name()); - return make_filtering_reader(std::move(rd), [zis = this->weak_from_this(), &range, &slice, s = std::move(s)] (const dht::decorated_key& dk) { - // The data query may be executing after the toppartitions_data_listener object has been removed, so check - if (zis) { - zis->_top_k_read.append(toppartitions_item_key{s, dk}); - } - return true; - }); + + return std::move(rd); } void toppartitions_data_listener::on_write(const schema_ptr& s, const frozen_mutation& m) { - if (s->ks_name() != _ks || s->cf_name() != _cf) { - return; + bool include_all = _table_filters.empty() && _keyspace_filters.empty(); + + if (include_all || _keyspace_filters.contains(s->ks_name()) || _table_filters.contains({s->ks_name(), s->cf_name()})) { + dblog.trace("toppartitions_data_listener::on_write: {}.{}", s->ks_name(), s->cf_name()); + _top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)}); } - dblog.trace("toppartitions_data_listener::on_write: {}.{}", _ks, _cf); - _top_k_write.append(toppartitions_item_key{s, m.decorated_key(*s)}); } toppartitions_data_listener::global_top_k::results @@ -121,15 +126,16 @@ toppartitions_data_listener::localize(const global_top_k::results& r) { return n; } -toppartitions_query::toppartitions_query(distributed& xdb, sstring ks, sstring cf, - std::chrono::milliseconds duration, size_t list_size, size_t capacity) - : _xdb(xdb), _ks(ks), _cf(cf), _duration(duration), _list_size(list_size), _capacity(capacity), +toppartitions_query::toppartitions_query(distributed& xdb, std::unordered_set, utils::tuple_hash>&& table_filters, + std::unordered_set&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity) + : _xdb(xdb), _table_filters(std::move(table_filters)), _keyspace_filters(std::move(keyspace_filters)), _duration(duration), _list_size(list_size), _capacity(capacity), _query(std::make_unique>()) { - dblog.debug("toppartitions_query on {}.{}", _ks, _cf); + dblog.debug("toppartitions_query on {} column families and {} keyspaces", !_table_filters.empty() ? std::to_string(_table_filters.size()) : "all", + !_keyspace_filters.empty() ? std::to_string(_keyspace_filters.size()) : "all"); } future<> toppartitions_query::scatter() { - return _query->start(std::ref(_xdb), _ks, _cf); + return _query->start(std::ref(_xdb), _table_filters, _keyspace_filters); } using top_t = toppartitions_data_listener::global_top_k::results; diff --git a/db/data_listeners.hh b/db/data_listeners.hh index 9714b54789..3551a8d3cd 100755 --- a/db/data_listeners.hh +++ b/db/data_listeners.hh @@ -26,6 +26,7 @@ #include #include +#include "utils/hash.hh" #include "schema_fwd.hh" #include "flat_mutation_reader.hh" #include "mutation_reader.hh" @@ -128,8 +129,8 @@ class toppartitions_data_listener : public data_listener, public weakly_referenc friend class toppartitions_query; database& _db; - sstring _ks; - sstring _cf; + std::unordered_set, utils::tuple_hash> _table_filters; + std::unordered_set _keyspace_filters; public: using top_k = utils::space_saving_top_k; @@ -142,7 +143,7 @@ private: top_k _top_k_write; public: - toppartitions_data_listener(database& db, sstring ks, sstring cf); + toppartitions_data_listener(database& db, std::unordered_set, utils::tuple_hash> table_filters, std::unordered_set keyspace_filters); ~toppartitions_data_listener(); virtual flat_mutation_reader on_read(const schema_ptr& s, const dht::partition_range& range, @@ -155,16 +156,16 @@ public: class toppartitions_query { distributed& _xdb; - sstring _ks; - sstring _cf; + std::unordered_set, utils::tuple_hash> _table_filters; + std::unordered_set _keyspace_filters; std::chrono::milliseconds _duration; size_t _list_size; size_t _capacity; std::unique_ptr> _query; public: - toppartitions_query(seastar::distributed& xdb, sstring ks, sstring cf, - std::chrono::milliseconds duration, size_t list_size, size_t capacity); + toppartitions_query(seastar::distributed& xdb, std::unordered_set, utils::tuple_hash>&& table_filters, + std::unordered_set&& keyspace_filters, std::chrono::milliseconds duration, size_t list_size, size_t capacity); struct results { toppartitions_data_listener::top_k read; diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 28725060a1..4dcd0a7f90 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -458,7 +458,7 @@ SEASTAR_TEST_CASE(clear_nonexistent_snapshot) { SEASTAR_TEST_CASE(toppartitions_cross_shard_schema_ptr) { return do_with_cql_env_thread([] (cql_test_env& e) { e.execute_cql("CREATE TABLE ks.tab (id int PRIMARY KEY)").get(); - db::toppartitions_query tq(e.db(), "ks", "tab", 1s, 100, 100); + db::toppartitions_query tq(e.db(), {{"ks", "tab"}}, {}, 1s, 100, 100); tq.scatter().get(); auto q = e.prepare("INSERT INTO ks.tab(id) VALUES(?)").get0(); // Generate many values to ensure crossing shards