From 7fd31088f2a2eba214a5e4e95babe0b4852cd280 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 14 Mar 2018 20:16:19 -0400 Subject: [PATCH] large_bitset/bloom filter: add preemption points in loops SSTables that contain many keys - a common case with small partitions in long lived nodes - can generate filters that are quite large. I have seen stalls over 80ms when reading a filter that was the result of a 6h write load of very small keys after nodetool compact (filter was in the 100s of MB) Similar care should be taken when creating the filter, as if the estimated number of partitions is big, the resulting large_bitset can be quite big as well. If we treat the i_filter.hh and large_bitset.hh interfaces as truly generic, then maybe we should have an in_thread version along with a common version. But the bloom filter is the only user for both and even if that changes in the future, it is still a good idea to run something with a massive loop in a thread. So for simplicity, I am just asserting that we are on a thread to avoid surprises, and inserting preemption points in the loops. Signed-off-by: Glauber Costa --- utils/i_filter.cc | 5 +++++ utils/large_bitset.cc | 13 +++++++++++++ utils/large_bitset.hh | 19 +++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/utils/i_filter.cc b/utils/i_filter.cc index 088f08d5f0..a4a697537d 100644 --- a/utils/i_filter.cc +++ b/utils/i_filter.cc @@ -23,11 +23,14 @@ #include "log.hh" #include "bloom_filter.hh" #include "bloom_calculations.hh" +#include namespace utils { static logging::logger filterlog("bloom_filter"); filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_probability) { + assert(seastar::thread::running_in_thread()); + if (max_false_pos_probability > 1.0) { throw std::invalid_argument(sprint("Invalid probability %f: must be lower than 1.0", max_false_pos_probability)); } @@ -42,6 +45,8 @@ filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_proba } filter_ptr i_filter::get_filter(int64_t num_elements, int target_buckets_per_elem) { + assert(seastar::thread::running_in_thread()); + int max_buckets_per_element = std::max(1, bloom_calculations::max_buckets_per_element(num_elements)); int buckets_per_element = std::min(target_buckets_per_elem, max_buckets_per_element); diff --git a/utils/large_bitset.cc b/utils/large_bitset.cc index 202af20f3b..b770899fa1 100644 --- a/utils/large_bitset.cc +++ b/utils/large_bitset.cc @@ -22,9 +22,14 @@ #include "large_bitset.hh" #include #include +#include #include "seastarx.hh" +using namespace seastar; + large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) { + assert(thread::running_in_thread()); + auto nr_blocks = align_up(nr_bits, bits_per_block()) / bits_per_block(); _storage.reserve(nr_blocks); size_t nr_ints = align_up(nr_bits, bits_per_int()) / bits_per_int(); @@ -33,16 +38,24 @@ large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) { _storage.push_back(std::make_unique(now)); std::fill_n(_storage.back().get(), now, 0); nr_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } void large_bitset::clear() { + assert(thread::running_in_thread()); + size_t nr_ints = align_up(_nr_bits, bits_per_int()) / bits_per_int(); auto bp = _storage.begin(); while (nr_ints) { auto now = std::min(ints_per_block(), nr_ints); std::fill_n(bp++->get(), now, 0); nr_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } diff --git a/utils/large_bitset.hh b/utils/large_bitset.hh index e3dd0f8796..019374d3aa 100644 --- a/utils/large_bitset.hh +++ b/utils/large_bitset.hh @@ -29,6 +29,10 @@ #include #include #include +#include +#include + +using namespace seastar; class large_bitset { static constexpr size_t block_size() { return 128 * 1024; } @@ -92,6 +96,8 @@ public: template size_t large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t position) { + assert(thread::running_in_thread()); + using input_int_type = typename std::iterator_traits::value_type; if (position % bits_per_int() == 0 && sizeof(input_int_type) == sizeof(int_type)) { auto idx = position; @@ -104,6 +110,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio start += now; ++idx1; idx2 = 0; + if (need_preempt()) { + thread::yield(); + } } } else { while (start != finish) { @@ -116,6 +125,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio } bitmask >>= 1; ++position; + if (need_preempt()) { + thread::yield(); + } } } } @@ -125,6 +137,7 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio template IntegerIterator large_bitset::save(IntegerIterator out, size_t position, size_t n) { + assert(thread::running_in_thread()); n = std::min(n, size() - position); using output_int_type = typename std::iterator_traits::value_type; if (position % bits_per_int() == 0 @@ -141,6 +154,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) { ++idx1; idx2 = 0; n_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } else { output_int_type result = 0; @@ -156,6 +172,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) { result = 0; bitpos = 0; } + if (need_preempt()) { + thread::yield(); + } } if (bitpos) { *out = result;