db/view: view_update_generator: re-balance wait/signal on the register semaphore
The view update generator has a semaphore to limit concurrency. This semaphore is waited on in `register_staging_sstable()` and later the unit is returned after the sstable is processed in the loop inside `start()`. This was broken by4e64002, which changed the loop inside `start()` to process sstables in per table batches, however didn't change the `signal()` call to return the amount of units according to the number of sstables processed. This can cause the semaphore units to dry up, as the loop can process multiple sstables per table but return just a single unit. This can also block callers of `register_staging_sstable()` indefinitely as some waiters will never be released as under the right circumstances the units on the semaphore can permanently go below 0. In addition to this,4e64002introduced another bug: table entries from the `_sstables_with_tables` are never removed, so they are processed every turn. If the sstable list is empty, there won't be any update generated but due to the unconditional `signal()` described above, this can cause the units on the semaphore to grow to infinity, allowing future staging sstables producers to register a huge amount of sstables, causing memory problems due to the amount of sstable readers that have to be opened (#6603, #6707). Both outcomes are equally bad. This patch fixes both issues and modifies the `test_view_update_generator` unit test to reproduce them and hence to verify that this doesn't happen in the future. Fixes: #6774 Refs: #6707 Refs: #6603 Tests: unit(dev) Signed-off-by: Botond Dénes <bdenes@scylladb.com> Message-Id: <20200706135108.116134-1-bdenes@scylladb.com> (cherry picked from commit5ebe2c28d1)
This commit is contained in:
@@ -44,11 +44,16 @@ future<> view_update_generator::start() {
|
||||
|
||||
// If we got here, we will process all tables we know about so far eventually so there
|
||||
// is no starvation
|
||||
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
|
||||
for (auto table_it = _sstables_with_tables.begin(); table_it != _sstables_with_tables.end(); table_it = _sstables_with_tables.erase(table_it)) {
|
||||
auto& [t, t_sstables] = *table_it;
|
||||
schema_ptr s = t->schema();
|
||||
|
||||
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), t_sstables.size());
|
||||
|
||||
// Copy what we have so far so we don't miss new updates
|
||||
auto sstables = std::exchange(_sstables_with_tables[t], {});
|
||||
auto sstables = std::exchange(t_sstables, {});
|
||||
|
||||
const auto num_sstables = sstables.size();
|
||||
|
||||
try {
|
||||
// temporary: need an sstable set for the flat mutation reader, but the
|
||||
@@ -89,7 +94,7 @@ future<> view_update_generator::start() {
|
||||
// Move from staging will be retried upon restart.
|
||||
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
|
||||
}
|
||||
_registration_sem.signal();
|
||||
_registration_sem.signal(num_sstables);
|
||||
}
|
||||
// For each table, move the processed staging sstables into the table's base dir.
|
||||
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {
|
||||
|
||||
@@ -32,7 +32,10 @@
|
||||
namespace db::view {
|
||||
|
||||
class view_update_generator {
|
||||
public:
|
||||
static constexpr size_t registration_queue_size = 5;
|
||||
|
||||
private:
|
||||
database& _db;
|
||||
seastar::abort_source _as;
|
||||
future<> _started = make_ready_future<>();
|
||||
@@ -51,6 +54,8 @@ public:
|
||||
future<> start();
|
||||
future<> stop();
|
||||
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
|
||||
|
||||
ssize_t available_register_units() const { return _registration_sem.available_units(); }
|
||||
private:
|
||||
bool should_throttle() const;
|
||||
};
|
||||
|
||||
@@ -421,23 +421,49 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
auto& view_update_generator = e.local_view_update_generator();
|
||||
auto s = test_table_schema();
|
||||
|
||||
std::vector<shared_sstable> ssts;
|
||||
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
|
||||
auto write_to_sstable = [&] (mutation m) {
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
return sst;
|
||||
};
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
|
||||
mutation m(s, key);
|
||||
auto col = s->get_column_definition("v");
|
||||
for (int i = 1024; i < 1280; ++i) {
|
||||
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
|
||||
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(sstring(fmt::format("v{}", i)))));
|
||||
// Scatter the data in a bunch of different sstables, so we
|
||||
// can test the registration semaphore of the view update
|
||||
// generator
|
||||
if (!(i % 10)) {
|
||||
ssts.push_back(write_to_sstable(std::exchange(m, mutation(s, key))));
|
||||
}
|
||||
}
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
ssts.push_back(write_to_sstable(std::move(m)));
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
view_update_generator.register_staging_sstable(sst, t).get();
|
||||
parallel_for_each(ssts.begin(), ssts.begin() + 10, [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
parallel_for_each(ssts.begin() + 10, ssts.end(), [&] (shared_sstable& sst) {
|
||||
return view_update_generator.register_staging_sstable(sst, t);
|
||||
}).get();
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
|
||||
eventually([&, key1, key2] {
|
||||
auto msg = e.execute_cql(fmt::format("SELECT * FROM t WHERE p = '{}'", key1)).get0();
|
||||
@@ -464,5 +490,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
|
||||
}
|
||||
});
|
||||
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user