mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-29 19:21:01 +00:00
large_bitset/bloom filter: add preemption points in loops
SSTables that contain many keys - a common case with small partitions in long lived nodes - can generate filters that are quite large. I have seen stalls over 80ms when reading a filter that was the result of a 6h write load of very small keys after nodetool compact (filter was in the 100s of MB) Similar care should be taken when creating the filter, as if the estimated number of partitions is big, the resulting large_bitset can be quite big as well. If we treat the i_filter.hh and large_bitset.hh interfaces as truly generic, then maybe we should have an in_thread version along with a common version. But the bloom filter is the only user for both and even if that changes in the future, it is still a good idea to run something with a massive loop in a thread. So for simplicity, I am just asserting that we are on a thread to avoid surprises, and inserting preemption points in the loops. Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
@@ -23,11 +23,14 @@
|
||||
#include "log.hh"
|
||||
#include "bloom_filter.hh"
|
||||
#include "bloom_calculations.hh"
|
||||
#include <seastar/core/thread.hh>
|
||||
|
||||
namespace utils {
|
||||
static logging::logger filterlog("bloom_filter");
|
||||
|
||||
filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_probability) {
|
||||
assert(seastar::thread::running_in_thread());
|
||||
|
||||
if (max_false_pos_probability > 1.0) {
|
||||
throw std::invalid_argument(sprint("Invalid probability %f: must be lower than 1.0", max_false_pos_probability));
|
||||
}
|
||||
@@ -42,6 +45,8 @@ filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_proba
|
||||
}
|
||||
|
||||
filter_ptr i_filter::get_filter(int64_t num_elements, int target_buckets_per_elem) {
|
||||
assert(seastar::thread::running_in_thread());
|
||||
|
||||
int max_buckets_per_element = std::max(1, bloom_calculations::max_buckets_per_element(num_elements));
|
||||
int buckets_per_element = std::min(target_buckets_per_elem, max_buckets_per_element);
|
||||
|
||||
|
||||
@@ -22,9 +22,14 @@
|
||||
#include "large_bitset.hh"
|
||||
#include <algorithm>
|
||||
#include <seastar/core/align.hh>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include "seastarx.hh"
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) {
|
||||
assert(thread::running_in_thread());
|
||||
|
||||
auto nr_blocks = align_up(nr_bits, bits_per_block()) / bits_per_block();
|
||||
_storage.reserve(nr_blocks);
|
||||
size_t nr_ints = align_up(nr_bits, bits_per_int()) / bits_per_int();
|
||||
@@ -33,16 +38,24 @@ large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) {
|
||||
_storage.push_back(std::make_unique<int_type[]>(now));
|
||||
std::fill_n(_storage.back().get(), now, 0);
|
||||
nr_ints -= now;
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
large_bitset::clear() {
|
||||
assert(thread::running_in_thread());
|
||||
|
||||
size_t nr_ints = align_up(_nr_bits, bits_per_int()) / bits_per_int();
|
||||
auto bp = _storage.begin();
|
||||
while (nr_ints) {
|
||||
auto now = std::min(ints_per_block(), nr_ints);
|
||||
std::fill_n(bp++->get(), now, 0);
|
||||
nr_ints -= now;
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@
|
||||
#include <limits>
|
||||
#include <iterator>
|
||||
#include <algorithm>
|
||||
#include <seastar/core/thread.hh>
|
||||
#include <seastar/core/preempt.hh>
|
||||
|
||||
using namespace seastar;
|
||||
|
||||
class large_bitset {
|
||||
static constexpr size_t block_size() { return 128 * 1024; }
|
||||
@@ -92,6 +96,8 @@ public:
|
||||
template <typename IntegerIterator>
|
||||
size_t
|
||||
large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t position) {
|
||||
assert(thread::running_in_thread());
|
||||
|
||||
using input_int_type = typename std::iterator_traits<IntegerIterator>::value_type;
|
||||
if (position % bits_per_int() == 0 && sizeof(input_int_type) == sizeof(int_type)) {
|
||||
auto idx = position;
|
||||
@@ -104,6 +110,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio
|
||||
start += now;
|
||||
++idx1;
|
||||
idx2 = 0;
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while (start != finish) {
|
||||
@@ -116,6 +125,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio
|
||||
}
|
||||
bitmask >>= 1;
|
||||
++position;
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -125,6 +137,7 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio
|
||||
template <typename IntegerIterator>
|
||||
IntegerIterator
|
||||
large_bitset::save(IntegerIterator out, size_t position, size_t n) {
|
||||
assert(thread::running_in_thread());
|
||||
n = std::min(n, size() - position);
|
||||
using output_int_type = typename std::iterator_traits<IntegerIterator>::value_type;
|
||||
if (position % bits_per_int() == 0
|
||||
@@ -141,6 +154,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) {
|
||||
++idx1;
|
||||
idx2 = 0;
|
||||
n_ints -= now;
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
output_int_type result = 0;
|
||||
@@ -156,6 +172,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) {
|
||||
result = 0;
|
||||
bitpos = 0;
|
||||
}
|
||||
if (need_preempt()) {
|
||||
thread::yield();
|
||||
}
|
||||
}
|
||||
if (bitpos) {
|
||||
*out = result;
|
||||
|
||||
Reference in New Issue
Block a user