utils: add rolling max tracker

We will use it later to track parser memory
usage via per query samples.

Tests runtime in dev: 1.6s
This commit is contained in:
Marcin Maliszkiewicz
2026-03-05 18:18:16 +01:00
parent 37aeba9c8c
commit 5b2a07b408
4 changed files with 280 additions and 0 deletions

View File

@@ -618,6 +618,7 @@ scylla_tests = set([
'test/boost/reservoir_sampling_test',
'test/boost/result_utils_test',
'test/boost/rest_client_test',
'test/boost/rolling_max_tracker_test',
'test/boost/reusable_buffer_test',
'test/boost/rust_test',
'test/boost/s3_test',
@@ -1586,6 +1587,7 @@ pure_boost_tests = set([
'test/boost/wrapping_interval_test',
'test/boost/range_tombstone_list_test',
'test/boost/reservoir_sampling_test',
'test/boost/rolling_max_tracker_test',
'test/boost/serialization_test',
'test/boost/small_vector_test',
'test/boost/top_k_test',
@@ -1734,6 +1736,7 @@ deps['test/boost/url_parse_test'] = ['utils/http.cc', 'test/boost/url_parse_test
deps['test/boost/murmur_hash_test'] = ['bytes.cc', 'utils/murmur_hash.cc', 'test/boost/murmur_hash_test.cc']
deps['test/boost/allocation_strategy_test'] = ['test/boost/allocation_strategy_test.cc', 'utils/logalloc.cc', 'utils/dynamic_bitset.cc', 'utils/labels.cc']
deps['test/boost/log_heap_test'] = ['test/boost/log_heap_test.cc']
deps['test/boost/rolling_max_tracker_test'] = ['test/boost/rolling_max_tracker_test.cc']
deps['test/boost/estimated_histogram_test'] = ['test/boost/estimated_histogram_test.cc']
deps['test/boost/summary_test'] = ['test/boost/summary_test.cc']
deps['test/boost/anchorless_list_test'] = ['test/boost/anchorless_list_test.cc']

View File

@@ -92,6 +92,9 @@
#include <cstring>
#include <ctime>
#include <deque>
#include "utils/rolling_max_tracker.hh"
#include <endian.h>
#include <exception>
#if __has_include(<execinfo.h>)

View File

@@ -0,0 +1,215 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#define BOOST_TEST_MODULE rolling_max_tracker
#include <boost/test/unit_test.hpp>
#include <algorithm>
#include <seastar/core/bitops.hh>
#include "utils/rolling_max_tracker.hh"
// Helper: compute the expected current_max for a given raw value.
// Mirrors the tracker's internal rounding: clamp to 1, take log2ceil,
// then raise back to a power of two.
static size_t rounded(size_t v) {
return size_t(1) << seastar::log2ceil(std::max(v, size_t(1)));
}
BOOST_AUTO_TEST_CASE(test_empty_tracker_returns_zero) {
utils::rolling_max_tracker tracker(10);
BOOST_REQUIRE_EQUAL(tracker.current_max(), 0u);
}
BOOST_AUTO_TEST_CASE(test_single_sample) {
utils::rolling_max_tracker tracker(10);
tracker.add_sample(100);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
}
BOOST_AUTO_TEST_CASE(test_max_tracks_largest_in_window) {
utils::rolling_max_tracker tracker(10);
tracker.add_sample(5);
tracker.add_sample(20);
tracker.add_sample(10);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(20));
}
BOOST_AUTO_TEST_CASE(test_increasing_samples) {
utils::rolling_max_tracker tracker(5);
for (size_t i = 1; i <= 10; ++i) {
tracker.add_sample(i);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(i));
}
}
BOOST_AUTO_TEST_CASE(test_decreasing_samples) {
utils::rolling_max_tracker tracker(5);
tracker.add_sample(100);
tracker.add_sample(90);
tracker.add_sample(80);
tracker.add_sample(70);
tracker.add_sample(60);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
tracker.add_sample(50);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(90));
tracker.add_sample(40);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(80));
tracker.add_sample(30);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(70));
tracker.add_sample(20);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(60));
}
BOOST_AUTO_TEST_CASE(test_max_expires_from_window) {
utils::rolling_max_tracker tracker(3);
tracker.add_sample(100);
tracker.add_sample(1);
tracker.add_sample(2);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
tracker.add_sample(3);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(3));
}
BOOST_AUTO_TEST_CASE(test_new_max_replaces_smaller_entries) {
utils::rolling_max_tracker tracker(5);
tracker.add_sample(10);
tracker.add_sample(5);
tracker.add_sample(3);
tracker.add_sample(1);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(10));
tracker.add_sample(50);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(50));
tracker.add_sample(1);
tracker.add_sample(1);
tracker.add_sample(1);
tracker.add_sample(1);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(50));
tracker.add_sample(1);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(1));
}
BOOST_AUTO_TEST_CASE(test_window_size_one) {
utils::rolling_max_tracker tracker(1);
tracker.add_sample(100);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
tracker.add_sample(5);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
tracker.add_sample(200);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(200));
}
BOOST_AUTO_TEST_CASE(test_window_size_two) {
utils::rolling_max_tracker tracker(2);
tracker.add_sample(100);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
tracker.add_sample(5);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
tracker.add_sample(200);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(200));
tracker.add_sample(10);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(200));
tracker.add_sample(10);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(10));
}
BOOST_AUTO_TEST_CASE(test_equal_values) {
utils::rolling_max_tracker tracker(5);
tracker.add_sample(42);
tracker.add_sample(42);
tracker.add_sample(42);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(42));
for (int i = 0; i < 20; ++i) {
tracker.add_sample(42);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(42));
}
tracker.add_sample(100);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(100));
}
BOOST_AUTO_TEST_CASE(test_staircase_pattern) {
utils::rolling_max_tracker tracker(6);
for (size_t i = 1; i <= 5; ++i) {
tracker.add_sample(i);
}
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
tracker.add_sample(4);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
tracker.add_sample(3);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
tracker.add_sample(2);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
tracker.add_sample(1);
BOOST_REQUIRE_EQUAL(tracker.current_max(), rounded(5));
}
BOOST_AUTO_TEST_CASE(test_zero_sample_clamped_to_one) {
utils::rolling_max_tracker tracker(3);
tracker.add_sample(0);
BOOST_REQUIRE_EQUAL(tracker.current_max(), 1);
tracker.add_sample(0);
tracker.add_sample(0);
BOOST_REQUIRE_EQUAL(tracker.current_max(), 1);
}
BOOST_AUTO_TEST_CASE(test_current_max_is_upper_bound) {
// For any value, current_max() >= value (never underestimates).
utils::rolling_max_tracker tracker(1);
for (size_t v = 1; v <= 1024; ++v) {
tracker.add_sample(v);
BOOST_REQUIRE_GE(tracker.current_max(), v);
// And at most 2x the value
BOOST_REQUIRE_LE(tracker.current_max(), 2 * v);
}
}
BOOST_AUTO_TEST_CASE(test_sliding_window_correctness) {
const size_t window = 7;
const size_t n = 100;
utils::rolling_max_tracker tracker(window);
std::vector<size_t> values;
values.reserve(n);
for (size_t i = 0; i < n; ++i) {
values.push_back((i * 37 + 13) % 50);
}
for (size_t i = 0; i < n; ++i) {
tracker.add_sample(values[i]);
size_t start = (i + 1 > window) ? (i + 1 - window) : 0;
size_t expected_max = 0;
for (size_t j = start; j <= i; ++j) {
expected_max = std::max(expected_max, rounded(values[j]));
}
BOOST_REQUIRE_EQUAL(tracker.current_max(), expected_max);
}
}

View File

@@ -0,0 +1,59 @@
/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <utility>
#include <seastar/core/bitops.hh>
#include <seastar/core/circular_buffer_fixed_capacity.hh>
namespace utils {
/// Tracks the rolling maximum over the last `window_size` samples
/// using in amortized cost O(1) per sample. Current_max()
/// returns an upper bound that is a power of two, at most 2x the
/// true maximum) for efficiency.
class rolling_max_tracker {
// With the sample clamp to 1, log2ceil produces values
// in [0, 63] for 64-bit size_t, so at most 64 entries.
seastar::circular_buffer_fixed_capacity<std::pair<uint64_t, unsigned>, 64> _buf;
uint64_t _seq = 0;
size_t _window_size;
public:
explicit rolling_max_tracker(size_t window_size) noexcept
: _window_size(window_size) {
}
void add_sample(size_t value) noexcept {
// Clamp to 1 to avoid undefined log2ceil(0)
auto v = seastar::log2ceil(std::max(value, size_t(1)));
// Maintain the monotonic (decreasing) property:
// remove all entries from the back that are <= the new value,
// since they can never be the maximum while this entry is in the window.
while (!_buf.empty() && _buf.back().second <= v) {
_buf.pop_back();
}
_buf.emplace_back(_seq, v);
++_seq;
// Remove entries that have fallen out of the window from the front.
while (_buf.front().first + _window_size < _seq) {
_buf.pop_front();
}
}
size_t current_max() const noexcept {
return _buf.empty() ? 0 : size_t(1) << _buf.cbegin()->second;
}
};
} // namespace utils