/* * Copyright (C) 2011 Clearspring Technologies, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ /* * Copyright (C) 2018-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #pragma once /* Based on the following implementation ([2]) for the Space-Saving algorithm from [1]. [1] Metwally, A., Agrawal, D., & El Abbadi, A. (2005, January). Efficient computation of frequent and top-k elements in data streams. In International Conference on Database Theory (pp. 398-412). Springer, Berlin, Heidelberg. http://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf [2] https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/StreamSummary.java The algorithm keeps a map between keys seen and their counts, keeping a bound on the number of tracked keys. Replacement policy evicts the key with the lowest count while inheriting its count, and recording an estimation of the error which results from that. This error estimation can be later used to prove if the distribution we arrived at corresponds to the real top-K, which we can display alongside the results. Accuracy depends on the number of tracked keys. */ #include #include #include #include #include #include #include #include "utils/assert.hh" #include "utils/chunked_vector.hh" namespace utils { using namespace seastar; template , class KeyEqual = std::equal_to> class space_saving_top_k { private: struct bucket; using buckets_iterator = typename std::list::iterator; struct counter { buckets_iterator bucket_it; T item; unsigned count = 0; unsigned error = 0; counter(T item, unsigned count = 0, unsigned error = 0) : item(item), count(count), error(error) {} }; using counter_ptr = lw_shared_ptr; using counters = std::list; using counters_iterator = typename counters::iterator; using counters_map = std::unordered_map; using counters_map_iterator = typename counters_map::iterator; struct bucket { std::list counters; unsigned count; bucket(counter_ptr ctr) { count = ctr->count; counters.push_back(ctr); } bucket(T item, unsigned count, unsigned error) { counters.push_back(make_lw_shared(item, count, error)); this->count = count; } }; using buckets = std::list; size_t _capacity; counters_map _counters_map; buckets _buckets; // buckets list in ascending order bool _valid = true; public: /// capacity: maximum number of elements to be tracked space_saving_top_k(size_t capacity = 256) : _capacity(capacity) {} size_t capacity() const { return _capacity; } size_t size() const { if (!_valid) { throw std::runtime_error("space_saving_top_k state is invalid"); } return _counters_map.size(); } bool valid() const { return _valid; } // returns true if item is a new one bool append(T item, unsigned inc = 1, unsigned err = 0) { return std::get<0>(append_return_all(std::move(item), inc, err)); } // returns optionally dropped item (due to capacity overflow) std::optional append_return_dropped(T item, unsigned inc = 1, unsigned err = 0) { return std::get<1>(append_return_all(std::move(item), inc, err)); } // returns whether an element is new and an optionally dropped item (due to capacity overflow) std::tuple> append_return_all(T item, unsigned inc = 1, unsigned err = 0) { if (!_valid) { return {false, std::optional()}; } try { counters_map_iterator cmap_it = _counters_map.find(item); bool is_new_item = cmap_it == _counters_map.end(); std::optional dropped_item; counters_iterator counter_it; if (is_new_item) { if (size() < _capacity) { _buckets.emplace_front(bucket(std::move(item), 0, err)); // inc added later via increment_counter buckets_iterator new_bucket_it = _buckets.begin(); counter_it = new_bucket_it->counters.begin(); (*counter_it)->bucket_it = new_bucket_it; } else { buckets_iterator min_bucket = _buckets.begin(); SCYLLA_ASSERT(min_bucket != _buckets.end()); counter_it = min_bucket->counters.begin(); SCYLLA_ASSERT(counter_it != min_bucket->counters.end()); counter_ptr ctr = *counter_it; _counters_map.erase(ctr->item); dropped_item = std::exchange(ctr->item, std::move(item)); ctr->error = min_bucket->count + err; } _counters_map[item] = std::move(counter_it); } else { counter_it = cmap_it->second; } increment_counter(counter_it, inc); return {is_new_item, std::move(dropped_item)}; } catch (...) { _valid = false; std::rethrow_exception(std::current_exception()); } } private: void increment_counter(counters_iterator counter_it, unsigned inc) { counter_ptr ctr = *counter_it; buckets_iterator old_bucket_it = ctr->bucket_it; auto& old_buck = *old_bucket_it; old_buck.counters.erase(counter_it); ctr->count += inc; buckets_iterator bi_prev = old_bucket_it; buckets_iterator bi_next = std::next(old_bucket_it); while (bi_next != _buckets.end()) { bucket& buck = *bi_next; if (ctr->count == buck.count) { buck.counters.push_back(ctr); counter_it = std::prev(buck.counters.end()); break; } else if (ctr->count > buck.count) { bi_prev = bi_next; bi_next = std::next(bi_prev); } else { bi_next = _buckets.end(); // create new bucket } } if (bi_next == _buckets.end()) { bucket buck{ctr}; counter_it = buck.counters.begin(); bi_next = _buckets.insert(std::next(bi_prev), std::move(buck)); } ctr->bucket_it = bi_next; _counters_map[ctr->item] = std::move(counter_it); if (old_buck.counters.empty()) { _buckets.erase(old_bucket_it); } } //----------------------------------------------------------------------------------------- // Results public: struct result { T item; unsigned count; unsigned error; }; using results = chunked_vector; results top(unsigned k) const { if (!_valid) { throw std::runtime_error("space_saving_top_k state is invalid"); } results list; // _buckets are in ascending order for (auto b_it = _buckets.rbegin(); b_it != _buckets.rend(); ++b_it) { auto& b = *b_it; for (auto& c: b.counters) { if (list.size() == k) { return list; } list.emplace_back(result{c->item, c->count, c->error}); } } return list; } void append(const results& res) { for (auto& r: res) { append(r.item, r.count, r.error); } } //----------------------------------------------------------------------------------------- // Diagnostics public: template friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::counter& c); template friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::counters_map& counters_map); template friend std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::buckets& buckets); template friend std::ostream& operator<<(std::ostream& out, const space_saving_top_k& top_k); }; //--------------------------------------------------------------------------------------------- template std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::counter& c) { out << c.item << " " << c.count << "/" << c.error << " " << &*c.bucket_it; return out; } template std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::counters_map& counters_map) { out << "{\n"; for (auto const& [item, counter_i]: counters_map) { out << item << " => " << **counter_i << "\n"; } out << "}\n"; return out; } template std::ostream& operator<<(std::ostream& out, const typename space_saving_top_k::buckets& buckets) { for (auto& b: buckets) { out << &b << " " << b.count << " ["; for (auto& c: b.counters) { out << *c << " "; } out << "]\n"; } return out; } template std::ostream& operator<<(std::ostream& out, const space_saving_top_k& top_k) { out << top_k._buckets; out << top_k._counters_map; out << "---\n"; return out; } } // namespace utils