diff --git a/db/config.cc b/db/config.cc index 43b365c4e0..90ff40e414 100644 --- a/db/config.cc +++ b/db/config.cc @@ -985,6 +985,7 @@ db::config::config(std::shared_ptr exts) , unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.") , sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)" "bytes written to data file. Value must be between 0 and 1.") + , components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .1, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.") , large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.") , enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.") , enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.") diff --git a/db/config.hh b/db/config.hh index 4c30ce425a..6fb9cdadc6 100644 --- a/db/config.hh +++ b/db/config.hh @@ -365,6 +365,7 @@ public: named_value murmur3_partitioner_ignore_msb_bits; named_value unspooled_dirty_soft_limit; named_value sstable_summary_ratio; + named_value components_memory_reclaim_threshold; named_value large_memory_allocation_warning_threshold; named_value enable_deprecated_partitioners; named_value enable_keyspace_column_family_metrics; diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 58dcd7b627..a28b210a0d 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1321,6 +1321,9 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept { } _open_mode.emplace(open_flags::ro); _stats.on_open_for_reading(); + + _total_reclaimable_memory.reset(); + _manager.increment_total_reclaimable_memory_and_maybe_reclaim(this); } future<> sstable::update_info_for_opened_data(sstable_open_config cfg) { @@ -1396,6 +1399,31 @@ void sstable::write_filter() { write_simple(filter_ref); } +size_t sstable::total_reclaimable_memory_size() const { + if (!_total_reclaimable_memory) { + _total_reclaimable_memory = _components->filter ? _components->filter->memory_size() : 0; + } + + return _total_reclaimable_memory.value(); +} + +size_t sstable::reclaim_memory_from_components() { + size_t total_memory_reclaimed = 0; + + if (_components->filter) { + auto filter_memory_size = _components->filter->memory_size(); + if (filter_memory_size > 0) { + // discard it from memory by replacing it with an always present variant + _components->filter = std::make_unique(); + _recognized_components.erase(component_type::Filter); + total_memory_reclaimed += filter_memory_size; + } + } + + _total_reclaimable_memory.reset(); + return total_memory_reclaimed; +} + // This interface is only used during tests, snapshot loading and early initialization. // No need to set tunable priorities for it. future<> sstable::load(const dht::sharder& sharder, sstable_open_config cfg) noexcept { @@ -1431,7 +1459,10 @@ future<> sstable::load(sstables::foreign_sstable_open_info info) noexcept { validate_min_max_metadata(); validate_max_local_deletion_time(); validate_partitioner(); - return update_info_for_opened_data(); + return update_info_for_opened_data().then([this]() { + _total_reclaimable_memory.reset(); + _manager.increment_total_reclaimable_memory_and_maybe_reclaim(this); + }); }); } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index bda4cd6ba8..f5ce4d3785 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -588,6 +588,11 @@ private: // information in their scylla metadata. std::optional _large_data_stats; sstring _origin; + + // Total reclaimable memory from all the components of the SSTable. + // It is initialized to 0 to prevent the sstables manager from reclaiming memory + // from the components before the SSTable has been fully loaded. + mutable std::optional _total_reclaimable_memory{0}; public: bool has_component(component_type f) const; sstables_manager& manager() { return _manager; } @@ -679,6 +684,12 @@ private: future<> create_data() noexcept; + // Return the total reclaimable memory in this SSTable + size_t total_reclaimable_memory_size() const; + // Reclaim memory from the components back to the system. + // Note that only bloom filters are reclaimable. + size_t reclaim_memory_from_components(); + public: // Finds first position_in_partition in a given partition. // If reversed is false, then the first position is actually the first row (can be the static one). diff --git a/sstables/sstables_manager.cc b/sstables/sstables_manager.cc index 844a5605a8..87a1a1a4c5 100644 --- a/sstables/sstables_manager.cc +++ b/sstables/sstables_manager.cc @@ -25,6 +25,7 @@ logging::logger smlogger("sstables_manager"); sstables_manager::sstables_manager( sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function&& resolve_host_id, storage_manager* shared) : _storage(shared) + , _available_memory(available_memory) , _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat), _cache_tracker(ct) , _sstable_metadata_concurrency_sem( max_count_sstable_metadata_concurrent_reads, @@ -146,11 +147,35 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const { return cfg; } +void sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst) { + _total_reclaimable_memory += sst->total_reclaimable_memory_size(); + + size_t memory_reclaim_threshold = _available_memory * _db_config.components_memory_reclaim_threshold(); + if (_total_reclaimable_memory <= memory_reclaim_threshold) { + // total memory used is within limit; no need to reclaim. + return; + } + + // Memory consumption has crossed threshold. Reclaim from the SSTable that + // has the most reclaimable memory to get the total consumption under limit. + auto sst_with_max_memory = std::max_element(_active.begin(), _active.end(), [](const sstable& sst1, const sstable& sst2) { + return sst1.total_reclaimable_memory_size() < sst2.total_reclaimable_memory_size(); + }); + + auto memory_reclaimed = sst_with_max_memory->reclaim_memory_from_components(); + _total_memory_reclaimed += memory_reclaimed; + _total_reclaimable_memory -= memory_reclaimed; + smlogger.info("Reclaimed {} bytes of memory from SSTable components. Total memory reclaimed so far is {} bytes", memory_reclaimed, _total_memory_reclaimed); +} + void sstables_manager::add(sstable* sst) { _active.push_back(*sst); } void sstables_manager::deactivate(sstable* sst) { + // Remove SSTable from the reclaimable memory tracking + _total_reclaimable_memory -= sst->total_reclaimable_memory_size(); + // At this point, sst has a reference count of zero, since we got here from // lw_shared_ptr_deleter::dispose(). _active.erase(_active.iterator_to(*sst)); diff --git a/sstables/sstables_manager.hh b/sstables/sstables_manager.hh index e29578698d..cb4e45152b 100644 --- a/sstables/sstables_manager.hh +++ b/sstables/sstables_manager.hh @@ -79,6 +79,7 @@ class sstables_manager { boost::intrusive::constant_time_size>; private: storage_manager* _storage; + size_t _available_memory; db::large_data_handler& _large_data_handler; const db::config& _db_config; gms::feature_service& _features; @@ -96,6 +97,11 @@ private: list_type _active; list_type _undergoing_close; + // Total reclaimable memory used by components of sstables in _active list + size_t _total_reclaimable_memory{0}; + // Total memory reclaimed so far across all sstables + size_t _total_memory_reclaimed{0}; + bool _closing = false; promise<> _done; cache_tracker& _cache_tracker; @@ -183,11 +189,19 @@ private: static constexpr size_t max_count_sstable_metadata_concurrent_reads{10}; // Allow at most 10% of memory to be filled with such reads. size_t max_memory_sstable_metadata_concurrent_reads(size_t available_memory) { return available_memory * 0.1; } + + // Increment the _total_reclaimable_memory with the new SSTable's reclaimable + // memory and if the total memory usage exceeds the pre-defined threshold, + // reclaim it from the SSTable that has the most reclaimable memory. + void increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst); private: db::large_data_handler& get_large_data_handler() const { return _large_data_handler; } friend class sstable; + + // Allow testing private methods/variables via test_env_sstables_manager + friend class test_env_sstables_manager; }; } // namespace sstables diff --git a/test/boost/sstable_datafile_test.cc b/test/boost/sstable_datafile_test.cc index 2f9398f208..c978735e3f 100644 --- a/test/boost/sstable_datafile_test.cc +++ b/test/boost/sstable_datafile_test.cc @@ -3176,3 +3176,70 @@ SEASTAR_TEST_CASE(test_sstable_set_predicate) { } }); } + +SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto schema_ptr = ss.schema(); + auto sst = env.make_sstable(schema_ptr); + + // create a bloom filter + auto sst_test = sstables::test(sst); + sst_test.create_bloom_filter(100); + auto total_reclaimable_memory = sst_test.total_reclaimable_memory_size(); + + // Test sstable::reclaim_memory_from_components() : + BOOST_REQUIRE_EQUAL(sst_test.reclaim_memory_from_components(), total_reclaimable_memory); + BOOST_REQUIRE_EQUAL(sst_test.total_reclaimable_memory_size(), 0); + BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0); + }); +} + +std::pair create_sstable_with_bloom_filter(test_env& env, test_env_sstables_manager& sst_mgr, schema_ptr sptr, uint64_t estimated_partitions) { + auto sst = env.make_sstable(sptr); + sstables::test(sst).create_bloom_filter(estimated_partitions); + auto sst_bf_memory = sst->filter_memory_size(); + sst_mgr.increment_total_reclaimable_memory_and_maybe_reclaim(sst.get()); + return {sst, sst_bf_memory}; +} + +SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_under_pressure) { + return test_env::do_with_async([] (test_env& env) { + simple_schema ss; + auto schema_ptr = ss.schema(); + + auto& sst_mgr = env.manager(); + + // Verify nothing it reclaimed when under threshold + auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 70); + BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory); + BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0); + + auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 20); + // Confirm reclaim was still not triggered + BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory); + BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory); + BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0); + + // Verify manager reclaims from the largest sst when the total usage crosses thresold. + auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50); + // sst1 has the most reclaimable memory + BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0); + BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory); + BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory); + BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory); + + // Reclaim should also work on the latest sst being added + auto [sst4, sst4_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100); + // sst4 should have been reclaimed + BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0); + BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory); + BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory); + BOOST_REQUIRE_EQUAL(sst4->filter_memory_size(), 0); + BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst4_bf_memory); + }, { + // limit available memory to the sstables_manager to test reclaiming. + // this will set the reclaim threshold to 100 bytes. + .available_memory = 1000 + }); +} diff --git a/test/cql-pytest/test_bloom_filter.py b/test/cql-pytest/test_bloom_filter.py index a82602ac1e..b9ed62bba3 100644 --- a/test/cql-pytest/test_bloom_filter.py +++ b/test/cql-pytest/test_bloom_filter.py @@ -5,14 +5,20 @@ import pytest import rest_api import nodetool -from util import new_test_table +from util import new_test_table, config_value_context from cassandra.protocol import ConfigurationException +# Disable component memory reclamation by setting the threshold to max value +@pytest.fixture(scope="module") +def disable_component_memory_reclaim(cql): + with config_value_context(cql, 'components_memory_reclaim_threshold', '1'): + yield + # Test inserts `N` rows into table, flushes it # and tries to read `M` non-existing keys. # Then bloom filter's false-positive ratio is checked. @pytest.mark.parametrize("N,M,fp_chance", [(500, 1000, 0.1)]) -def test_bloom_filter(scylla_only, cql, test_keyspace, N, M, fp_chance): +def test_bloom_filter(scylla_only, cql, test_keyspace, disable_component_memory_reclaim, N, M, fp_chance): def run_test(cql, test_keyspace, N, M, fp_chance): with new_test_table(cql, test_keyspace, "a int PRIMARY KEY", f"WITH bloom_filter_fp_chance = {fp_chance}") as table: diff --git a/test/lib/sstable_test_env.hh b/test/lib/sstable_test_env.hh index 909f8380b9..b4807e3475 100644 --- a/test/lib/sstable_test_env.hh +++ b/test/lib/sstable_test_env.hh @@ -45,6 +45,14 @@ public: void set_promoted_index_block_size(size_t promoted_index_block_size) { _promoted_index_block_size = promoted_index_block_size; } + + void increment_total_reclaimable_memory_and_maybe_reclaim(sstable *sst) { + sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sst); + } + + size_t get_total_memory_reclaimed() { + return _total_memory_reclaimed; + } }; class test_env_compaction_manager { @@ -67,6 +75,7 @@ struct test_env_config { db::large_data_handler* large_data_handler = nullptr; data_dictionary::storage_options storage; // will be local by default bool use_uuid = true; + size_t available_memory = memory::stats().total_memory(); }; data_dictionary::storage_options make_test_object_storage_options(); diff --git a/test/lib/sstable_utils.hh b/test/lib/sstable_utils.hh index 5b01876b41..bc719d6b84 100644 --- a/test/lib/sstable_utils.hh +++ b/test/lib/sstable_utils.hh @@ -211,6 +211,19 @@ public: co_await _sst->_storage->move(*_sst, std::move(new_dir), new_generation, nullptr); _sst->_generation = std::move(new_generation); } + + void create_bloom_filter(uint64_t estimated_partitions, double max_false_pos_prob = 0.1) { + _sst->_components->filter = utils::i_filter::get_filter(estimated_partitions, max_false_pos_prob, utils::filter_format::m_format); + _sst->_total_reclaimable_memory.reset(); + } + + size_t total_reclaimable_memory_size() const { + return _sst->total_reclaimable_memory_size(); + } + + size_t reclaim_memory_from_components() { + return _sst->reclaim_memory_from_components(); + } }; inline auto replacer_fn_no_op() { diff --git a/test/lib/test_services.cc b/test/lib/test_services.cc index a7233510e6..c89a8db8fc 100644 --- a/test/lib/test_services.cc +++ b/test/lib/test_services.cc @@ -211,7 +211,7 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm) , dir_sem(1) , feature_service(gms::feature_config_from_db_config(*db_config)) , mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config, - feature_service, cache_tracker, memory::stats().total_memory(), dir_sem, + feature_service, cache_tracker, cfg.available_memory, dir_sem, [host_id = locator::host_id::create_random_id()]{ return host_id; }, sstm) , semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no) , use_uuid(cfg.use_uuid)