db/view: view_update_generator: re-balance wait/signal on the register semaphore

The view update generator has a semaphore to limit concurrency. This
semaphore is waited on in `register_staging_sstable()` and later the
unit is returned after the sstable is processed in the loop inside
`start()`.
This was broken by 4e64002, which changed the loop inside `start()` to
process sstables in per table batches, however didn't change the
`signal()` call to return the amount of units according to the number of
sstables processed. This can cause the semaphore units to dry up, as the
loop can process multiple sstables per table but return just a single
unit. This can also block callers of `register_staging_sstable()`
indefinitely as some waiters will never be released as under the right
circumstances the units on the semaphore can permanently go below 0.
In addition to this, 4e64002 introduced another bug: table entries from
the `_sstables_with_tables` are never removed, so they are processed
every turn. If the sstable list is empty, there won't be any update
generated but due to the unconditional `signal()` described above, this
can cause the units on the semaphore to grow to infinity, allowing
future staging sstables producers to register a huge amount of sstables,
causing memory problems due to the amount of sstable readers that have
to be opened (#6603, #6707).
Both outcomes are equally bad. This patch fixes both issues and modifies
the `test_view_update_generator` unit test to reproduce them and hence
to verify that this doesn't happen in the future.

Fixes: #6774
Refs: #6707
Refs: #6603

Tests: unit(dev)
Signed-off-by: Botond Dénes <bdenes@scylladb.com>
Message-Id: <20200706135108.116134-1-bdenes@scylladb.com>
(cherry picked from commit 5ebe2c28d1)
This commit is contained in:
Botond Dénes
2020-07-06 16:51:08 +03:00
committed by Avi Kivity
parent 76618a7e06
commit fea83f6ae0
3 changed files with 49 additions and 11 deletions

View File

@@ -44,11 +44,16 @@ future<> view_update_generator::start() {
// If we got here, we will process all tables we know about so far eventually so there
// is no starvation
for (auto& t : _sstables_with_tables | boost::adaptors::map_keys) {
for (auto table_it = _sstables_with_tables.begin(); table_it != _sstables_with_tables.end(); table_it = _sstables_with_tables.erase(table_it)) {
auto& [t, t_sstables] = *table_it;
schema_ptr s = t->schema();
vug_logger.trace("Processing {}.{}: {} sstables", s->ks_name(), s->cf_name(), t_sstables.size());
// Copy what we have so far so we don't miss new updates
auto sstables = std::exchange(_sstables_with_tables[t], {});
auto sstables = std::exchange(t_sstables, {});
const auto num_sstables = sstables.size();
try {
// temporary: need an sstable set for the flat mutation reader, but the
@@ -89,7 +94,7 @@ future<> view_update_generator::start() {
// Move from staging will be retried upon restart.
vug_logger.warn("Moving {} from staging failed: {}:{}. Ignoring...", s->ks_name(), s->cf_name(), std::current_exception());
}
_registration_sem.signal();
_registration_sem.signal(num_sstables);
}
// For each table, move the processed staging sstables into the table's base dir.
for (auto it = _sstables_to_move.begin(); it != _sstables_to_move.end(); ) {

View File

@@ -32,7 +32,10 @@
namespace db::view {
class view_update_generator {
public:
static constexpr size_t registration_queue_size = 5;
private:
database& _db;
seastar::abort_source _as;
future<> _started = make_ready_future<>();
@@ -51,6 +54,8 @@ public:
future<> start();
future<> stop();
future<> register_staging_sstable(sstables::shared_sstable sst, lw_shared_ptr<table> table);
ssize_t available_register_units() const { return _registration_sem.available_units(); }
private:
bool should_throttle() const;
};

View File

@@ -421,23 +421,49 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
auto& view_update_generator = e.local_view_update_generator();
auto s = test_table_schema();
std::vector<shared_sstable> ssts;
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
auto write_to_sstable = [&] (mutation m) {
auto sst = t->make_streaming_staging_sstable();
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
auto& pc = service::get_local_streaming_priority();
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
sst->open_data().get();
t->add_sstable_and_update_cache(sst).get();
return sst;
};
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
mutation m(s, key);
auto col = s->get_column_definition("v");
for (int i = 1024; i < 1280; ++i) {
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(sstring(fmt::format("v{}", i)))));
// Scatter the data in a bunch of different sstables, so we
// can test the registration semaphore of the view update
// generator
if (!(i % 10)) {
ssts.push_back(write_to_sstable(std::exchange(m, mutation(s, key))));
}
}
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
ssts.push_back(write_to_sstable(std::move(m)));
auto sst = t->make_streaming_staging_sstable();
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
auto& pc = service::get_local_streaming_priority();
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
sst->open_data().get();
t->add_sstable_and_update_cache(sst).get();
view_update_generator.register_staging_sstable(sst, t).get();
parallel_for_each(ssts.begin(), ssts.begin() + 10, [&] (shared_sstable& sst) {
return view_update_generator.register_staging_sstable(sst, t);
}).get();
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
parallel_for_each(ssts.begin() + 10, ssts.end(), [&] (shared_sstable& sst) {
return view_update_generator.register_staging_sstable(sst, t);
}).get();
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
eventually([&, key1, key2] {
auto msg = e.execute_cql(fmt::format("SELECT * FROM t WHERE p = '{}'", key1)).get0();
@@ -464,5 +490,7 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
}
});
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
});
}