diff --git a/utils/i_filter.cc b/utils/i_filter.cc index 088f08d5f0..a4a697537d 100644 --- a/utils/i_filter.cc +++ b/utils/i_filter.cc @@ -23,11 +23,14 @@ #include "log.hh" #include "bloom_filter.hh" #include "bloom_calculations.hh" +#include namespace utils { static logging::logger filterlog("bloom_filter"); filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_probability) { + assert(seastar::thread::running_in_thread()); + if (max_false_pos_probability > 1.0) { throw std::invalid_argument(sprint("Invalid probability %f: must be lower than 1.0", max_false_pos_probability)); } @@ -42,6 +45,8 @@ filter_ptr i_filter::get_filter(int64_t num_elements, double max_false_pos_proba } filter_ptr i_filter::get_filter(int64_t num_elements, int target_buckets_per_elem) { + assert(seastar::thread::running_in_thread()); + int max_buckets_per_element = std::max(1, bloom_calculations::max_buckets_per_element(num_elements)); int buckets_per_element = std::min(target_buckets_per_elem, max_buckets_per_element); diff --git a/utils/large_bitset.cc b/utils/large_bitset.cc index 202af20f3b..b770899fa1 100644 --- a/utils/large_bitset.cc +++ b/utils/large_bitset.cc @@ -22,9 +22,14 @@ #include "large_bitset.hh" #include #include +#include #include "seastarx.hh" +using namespace seastar; + large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) { + assert(thread::running_in_thread()); + auto nr_blocks = align_up(nr_bits, bits_per_block()) / bits_per_block(); _storage.reserve(nr_blocks); size_t nr_ints = align_up(nr_bits, bits_per_int()) / bits_per_int(); @@ -33,16 +38,24 @@ large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) { _storage.push_back(std::make_unique(now)); std::fill_n(_storage.back().get(), now, 0); nr_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } void large_bitset::clear() { + assert(thread::running_in_thread()); + size_t nr_ints = align_up(_nr_bits, bits_per_int()) / bits_per_int(); auto bp = _storage.begin(); while (nr_ints) { auto now = std::min(ints_per_block(), nr_ints); std::fill_n(bp++->get(), now, 0); nr_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } diff --git a/utils/large_bitset.hh b/utils/large_bitset.hh index e3dd0f8796..019374d3aa 100644 --- a/utils/large_bitset.hh +++ b/utils/large_bitset.hh @@ -29,6 +29,10 @@ #include #include #include +#include +#include + +using namespace seastar; class large_bitset { static constexpr size_t block_size() { return 128 * 1024; } @@ -92,6 +96,8 @@ public: template size_t large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t position) { + assert(thread::running_in_thread()); + using input_int_type = typename std::iterator_traits::value_type; if (position % bits_per_int() == 0 && sizeof(input_int_type) == sizeof(int_type)) { auto idx = position; @@ -104,6 +110,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio start += now; ++idx1; idx2 = 0; + if (need_preempt()) { + thread::yield(); + } } } else { while (start != finish) { @@ -116,6 +125,9 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio } bitmask >>= 1; ++position; + if (need_preempt()) { + thread::yield(); + } } } } @@ -125,6 +137,7 @@ large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t positio template IntegerIterator large_bitset::save(IntegerIterator out, size_t position, size_t n) { + assert(thread::running_in_thread()); n = std::min(n, size() - position); using output_int_type = typename std::iterator_traits::value_type; if (position % bits_per_int() == 0 @@ -141,6 +154,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) { ++idx1; idx2 = 0; n_ints -= now; + if (need_preempt()) { + thread::yield(); + } } } else { output_int_type result = 0; @@ -156,6 +172,9 @@ large_bitset::save(IntegerIterator out, size_t position, size_t n) { result = 0; bitpos = 0; } + if (need_preempt()) { + thread::yield(); + } } if (bitpos) { *out = result;