Merge 'Track and limit memory used by bloom filters' from Lakshmi Narayanan Sreethar

Added support to track and limit the memory usage by sstable components. A reclaimable component of an SSTable is one from which memory can be reclaimed. SSTables and their managers now track such reclaimable memory and limit the component memory usage accordingly. A new configuration variable defines the memory reclaim threshold. If the total memory of the reclaimable components exceeds this limit, memory will be reclaimed to keep the usage under the limit. This PR considers only the bloom filters as reclaimable and adds support to track and limit them as required.

The feature can be manually verified by doing the following :
1. run a single-node single-shard 1GB cluster
2. create a table with bloom-filter-false-positive-chance of 0.001 (to intentionally cause large bloom filter)
3. populate with tiny partitions
4. watch the bloom filter metrics get capped at 100MB

The default value of the `components_memory_reclaim_threshold` config variable which controls the reclamation process is `.1`. This can also be reduced further during manual tests to easily hit the threshold and verify the feature.

Fixes #17747

Closes scylladb/scylladb#17771

* github.com:scylladb/scylladb:
  test_bloom_filter.py: disable reclaiming memory from components
  sstable_datafile_test: add tests to verify auto reclamation of components
  test/lib: allow overriding available memory via test_env_config
  sstables_manager: support reclaiming memory from components
  sstables_manager: store available memory size
  sstables_manager: add variable to track component memory usage
  db/config: add a new variable to limit memory used by table components
  sstable_datafile_test: add testcase to verify reclamation from sstables
  sstables: support reclaiming memory from components
This commit is contained in:
Botond Dénes
2024-04-02 10:40:52 +03:00
11 changed files with 182 additions and 4 deletions

View File

@@ -985,6 +985,7 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, unspooled_dirty_soft_limit(this, "unspooled_dirty_soft_limit", value_status::Used, 0.6, "Soft limit of unspooled dirty memory expressed as a portion of the hard limit.")
, sstable_summary_ratio(this, "sstable_summary_ratio", value_status::Used, 0.0005, "Enforces that 1 byte of summary is written for every N (2000 by default)"
"bytes written to data file. Value must be between 0 and 1.")
, components_memory_reclaim_threshold(this, "components_memory_reclaim_threshold", liveness::LiveUpdate, value_status::Used, .1, "Ratio of available memory for all in-memory components of SSTables in a shard beyond which the memory will be reclaimed from components until it falls back under the threshold. Currently, this limit is only enforced for bloom filters.")
, large_memory_allocation_warning_threshold(this, "large_memory_allocation_warning_threshold", value_status::Used, size_t(1) << 20, "Warn about memory allocations above this size; set to zero to disable.")
, enable_deprecated_partitioners(this, "enable_deprecated_partitioners", value_status::Used, false, "Enable the byteordered and random partitioners. These partitioners are deprecated and will be removed in a future version.")
, enable_keyspace_column_family_metrics(this, "enable_keyspace_column_family_metrics", value_status::Used, false, "Enable per keyspace and per column family metrics reporting.")

View File

@@ -365,6 +365,7 @@ public:
named_value<unsigned> murmur3_partitioner_ignore_msb_bits;
named_value<double> unspooled_dirty_soft_limit;
named_value<double> sstable_summary_ratio;
named_value<double> components_memory_reclaim_threshold;
named_value<size_t> large_memory_allocation_warning_threshold;
named_value<bool> enable_deprecated_partitioners;
named_value<bool> enable_keyspace_column_family_metrics;

View File

@@ -1321,6 +1321,9 @@ future<> sstable::open_data(sstable_open_config cfg) noexcept {
}
_open_mode.emplace(open_flags::ro);
_stats.on_open_for_reading();
_total_reclaimable_memory.reset();
_manager.increment_total_reclaimable_memory_and_maybe_reclaim(this);
}
future<> sstable::update_info_for_opened_data(sstable_open_config cfg) {
@@ -1396,6 +1399,31 @@ void sstable::write_filter() {
write_simple<component_type::Filter>(filter_ref);
}
size_t sstable::total_reclaimable_memory_size() const {
if (!_total_reclaimable_memory) {
_total_reclaimable_memory = _components->filter ? _components->filter->memory_size() : 0;
}
return _total_reclaimable_memory.value();
}
size_t sstable::reclaim_memory_from_components() {
size_t total_memory_reclaimed = 0;
if (_components->filter) {
auto filter_memory_size = _components->filter->memory_size();
if (filter_memory_size > 0) {
// discard it from memory by replacing it with an always present variant
_components->filter = std::make_unique<utils::filter::always_present_filter>();
_recognized_components.erase(component_type::Filter);
total_memory_reclaimed += filter_memory_size;
}
}
_total_reclaimable_memory.reset();
return total_memory_reclaimed;
}
// This interface is only used during tests, snapshot loading and early initialization.
// No need to set tunable priorities for it.
future<> sstable::load(const dht::sharder& sharder, sstable_open_config cfg) noexcept {
@@ -1431,7 +1459,10 @@ future<> sstable::load(sstables::foreign_sstable_open_info info) noexcept {
validate_min_max_metadata();
validate_max_local_deletion_time();
validate_partitioner();
return update_info_for_opened_data();
return update_info_for_opened_data().then([this]() {
_total_reclaimable_memory.reset();
_manager.increment_total_reclaimable_memory_and_maybe_reclaim(this);
});
});
}

View File

@@ -588,6 +588,11 @@ private:
// information in their scylla metadata.
std::optional<scylla_metadata::large_data_stats> _large_data_stats;
sstring _origin;
// Total reclaimable memory from all the components of the SSTable.
// It is initialized to 0 to prevent the sstables manager from reclaiming memory
// from the components before the SSTable has been fully loaded.
mutable std::optional<size_t> _total_reclaimable_memory{0};
public:
bool has_component(component_type f) const;
sstables_manager& manager() { return _manager; }
@@ -679,6 +684,12 @@ private:
future<> create_data() noexcept;
// Return the total reclaimable memory in this SSTable
size_t total_reclaimable_memory_size() const;
// Reclaim memory from the components back to the system.
// Note that only bloom filters are reclaimable.
size_t reclaim_memory_from_components();
public:
// Finds first position_in_partition in a given partition.
// If reversed is false, then the first position is actually the first row (can be the static one).

View File

@@ -25,6 +25,7 @@ logging::logger smlogger("sstables_manager");
sstables_manager::sstables_manager(
sstring name, db::large_data_handler& large_data_handler, const db::config& dbcfg, gms::feature_service& feat, cache_tracker& ct, size_t available_memory, directory_semaphore& dir_sem, noncopyable_function<locator::host_id()>&& resolve_host_id, storage_manager* shared)
: _storage(shared)
, _available_memory(available_memory)
, _large_data_handler(large_data_handler), _db_config(dbcfg), _features(feat), _cache_tracker(ct)
, _sstable_metadata_concurrency_sem(
max_count_sstable_metadata_concurrent_reads,
@@ -146,11 +147,35 @@ sstable_writer_config sstables_manager::configure_writer(sstring origin) const {
return cfg;
}
void sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst) {
_total_reclaimable_memory += sst->total_reclaimable_memory_size();
size_t memory_reclaim_threshold = _available_memory * _db_config.components_memory_reclaim_threshold();
if (_total_reclaimable_memory <= memory_reclaim_threshold) {
// total memory used is within limit; no need to reclaim.
return;
}
// Memory consumption has crossed threshold. Reclaim from the SSTable that
// has the most reclaimable memory to get the total consumption under limit.
auto sst_with_max_memory = std::max_element(_active.begin(), _active.end(), [](const sstable& sst1, const sstable& sst2) {
return sst1.total_reclaimable_memory_size() < sst2.total_reclaimable_memory_size();
});
auto memory_reclaimed = sst_with_max_memory->reclaim_memory_from_components();
_total_memory_reclaimed += memory_reclaimed;
_total_reclaimable_memory -= memory_reclaimed;
smlogger.info("Reclaimed {} bytes of memory from SSTable components. Total memory reclaimed so far is {} bytes", memory_reclaimed, _total_memory_reclaimed);
}
void sstables_manager::add(sstable* sst) {
_active.push_back(*sst);
}
void sstables_manager::deactivate(sstable* sst) {
// Remove SSTable from the reclaimable memory tracking
_total_reclaimable_memory -= sst->total_reclaimable_memory_size();
// At this point, sst has a reference count of zero, since we got here from
// lw_shared_ptr_deleter<sstables::sstable>::dispose().
_active.erase(_active.iterator_to(*sst));

View File

@@ -79,6 +79,7 @@ class sstables_manager {
boost::intrusive::constant_time_size<false>>;
private:
storage_manager* _storage;
size_t _available_memory;
db::large_data_handler& _large_data_handler;
const db::config& _db_config;
gms::feature_service& _features;
@@ -96,6 +97,11 @@ private:
list_type _active;
list_type _undergoing_close;
// Total reclaimable memory used by components of sstables in _active list
size_t _total_reclaimable_memory{0};
// Total memory reclaimed so far across all sstables
size_t _total_memory_reclaimed{0};
bool _closing = false;
promise<> _done;
cache_tracker& _cache_tracker;
@@ -183,11 +189,19 @@ private:
static constexpr size_t max_count_sstable_metadata_concurrent_reads{10};
// Allow at most 10% of memory to be filled with such reads.
size_t max_memory_sstable_metadata_concurrent_reads(size_t available_memory) { return available_memory * 0.1; }
// Increment the _total_reclaimable_memory with the new SSTable's reclaimable
// memory and if the total memory usage exceeds the pre-defined threshold,
// reclaim it from the SSTable that has the most reclaimable memory.
void increment_total_reclaimable_memory_and_maybe_reclaim(sstable* sst);
private:
db::large_data_handler& get_large_data_handler() const {
return _large_data_handler;
}
friend class sstable;
// Allow testing private methods/variables via test_env_sstables_manager
friend class test_env_sstables_manager;
};
} // namespace sstables

View File

@@ -3176,3 +3176,70 @@ SEASTAR_TEST_CASE(test_sstable_set_predicate) {
}
});
}
SEASTAR_TEST_CASE(test_sstable_reclaim_memory_from_components) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto schema_ptr = ss.schema();
auto sst = env.make_sstable(schema_ptr);
// create a bloom filter
auto sst_test = sstables::test(sst);
sst_test.create_bloom_filter(100);
auto total_reclaimable_memory = sst_test.total_reclaimable_memory_size();
// Test sstable::reclaim_memory_from_components() :
BOOST_REQUIRE_EQUAL(sst_test.reclaim_memory_from_components(), total_reclaimable_memory);
BOOST_REQUIRE_EQUAL(sst_test.total_reclaimable_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst->filter_memory_size(), 0);
});
}
std::pair<shared_sstable, size_t> create_sstable_with_bloom_filter(test_env& env, test_env_sstables_manager& sst_mgr, schema_ptr sptr, uint64_t estimated_partitions) {
auto sst = env.make_sstable(sptr);
sstables::test(sst).create_bloom_filter(estimated_partitions);
auto sst_bf_memory = sst->filter_memory_size();
sst_mgr.increment_total_reclaimable_memory_and_maybe_reclaim(sst.get());
return {sst, sst_bf_memory};
}
SEASTAR_TEST_CASE(test_sstable_manager_auto_reclaim_under_pressure) {
return test_env::do_with_async([] (test_env& env) {
simple_schema ss;
auto schema_ptr = ss.schema();
auto& sst_mgr = env.manager();
// Verify nothing it reclaimed when under threshold
auto [sst1, sst1_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 70);
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
auto [sst2, sst2_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 20);
// Confirm reclaim was still not triggered
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), sst1_bf_memory);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), 0);
// Verify manager reclaims from the largest sst when the total usage crosses thresold.
auto [sst3, sst3_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 50);
// sst1 has the most reclaimable memory
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory);
// Reclaim should also work on the latest sst being added
auto [sst4, sst4_bf_memory] = create_sstable_with_bloom_filter(env, sst_mgr, schema_ptr, 100);
// sst4 should have been reclaimed
BOOST_REQUIRE_EQUAL(sst1->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst2->filter_memory_size(), sst2_bf_memory);
BOOST_REQUIRE_EQUAL(sst3->filter_memory_size(), sst3_bf_memory);
BOOST_REQUIRE_EQUAL(sst4->filter_memory_size(), 0);
BOOST_REQUIRE_EQUAL(sst_mgr.get_total_memory_reclaimed(), sst1_bf_memory + sst4_bf_memory);
}, {
// limit available memory to the sstables_manager to test reclaiming.
// this will set the reclaim threshold to 100 bytes.
.available_memory = 1000
});
}

View File

@@ -5,14 +5,20 @@
import pytest
import rest_api
import nodetool
from util import new_test_table
from util import new_test_table, config_value_context
from cassandra.protocol import ConfigurationException
# Disable component memory reclamation by setting the threshold to max value
@pytest.fixture(scope="module")
def disable_component_memory_reclaim(cql):
with config_value_context(cql, 'components_memory_reclaim_threshold', '1'):
yield
# Test inserts `N` rows into table, flushes it
# and tries to read `M` non-existing keys.
# Then bloom filter's false-positive ratio is checked.
@pytest.mark.parametrize("N,M,fp_chance", [(500, 1000, 0.1)])
def test_bloom_filter(scylla_only, cql, test_keyspace, N, M, fp_chance):
def test_bloom_filter(scylla_only, cql, test_keyspace, disable_component_memory_reclaim, N, M, fp_chance):
def run_test(cql, test_keyspace, N, M, fp_chance):
with new_test_table(cql, test_keyspace, "a int PRIMARY KEY",
f"WITH bloom_filter_fp_chance = {fp_chance}") as table:

View File

@@ -45,6 +45,14 @@ public:
void set_promoted_index_block_size(size_t promoted_index_block_size) {
_promoted_index_block_size = promoted_index_block_size;
}
void increment_total_reclaimable_memory_and_maybe_reclaim(sstable *sst) {
sstables_manager::increment_total_reclaimable_memory_and_maybe_reclaim(sst);
}
size_t get_total_memory_reclaimed() {
return _total_memory_reclaimed;
}
};
class test_env_compaction_manager {
@@ -67,6 +75,7 @@ struct test_env_config {
db::large_data_handler* large_data_handler = nullptr;
data_dictionary::storage_options storage; // will be local by default
bool use_uuid = true;
size_t available_memory = memory::stats().total_memory();
};
data_dictionary::storage_options make_test_object_storage_options();

View File

@@ -211,6 +211,19 @@ public:
co_await _sst->_storage->move(*_sst, std::move(new_dir), new_generation, nullptr);
_sst->_generation = std::move(new_generation);
}
void create_bloom_filter(uint64_t estimated_partitions, double max_false_pos_prob = 0.1) {
_sst->_components->filter = utils::i_filter::get_filter(estimated_partitions, max_false_pos_prob, utils::filter_format::m_format);
_sst->_total_reclaimable_memory.reset();
}
size_t total_reclaimable_memory_size() const {
return _sst->total_reclaimable_memory_size();
}
size_t reclaim_memory_from_components() {
return _sst->reclaim_memory_from_components();
}
};
inline auto replacer_fn_no_op() {

View File

@@ -211,7 +211,7 @@ test_env::impl::impl(test_env_config cfg, sstables::storage_manager* sstm)
, dir_sem(1)
, feature_service(gms::feature_config_from_db_config(*db_config))
, mgr("test_env", cfg.large_data_handler == nullptr ? nop_ld_handler : *cfg.large_data_handler, *db_config,
feature_service, cache_tracker, memory::stats().total_memory(), dir_sem,
feature_service, cache_tracker, cfg.available_memory, dir_sem,
[host_id = locator::host_id::create_random_id()]{ return host_id; }, sstm)
, semaphore(reader_concurrency_semaphore::no_limits{}, "sstables::test_env", reader_concurrency_semaphore::register_metrics::no)
, use_uuid(cfg.use_uuid)