From cd849ed40db6219192fbeae5441c2b8e03f26e64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 7 Jul 2020 12:13:17 +0300 Subject: [PATCH 01/10] database: add make_restricted_range_sstable_reader() A variant of `make_range_sstable_reader()` that wraps the reader in a restricting reader, hence making it wait for admission on the read concurrency semaphore, before starting to actually read. --- database.hh | 19 +++++++++++++++++++ table.cc | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/database.hh b/database.hh index 15bdc19917..d0fb002b74 100644 --- a/database.hh +++ b/database.hh @@ -1024,6 +1024,10 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); +/// Read a range from the passed-in sstables. +/// +/// The reader is unrestricted, but will account its resource usage on the +/// semaphore belonging to the passed-in permit. flat_mutation_reader make_range_sstable_reader(schema_ptr s, reader_permit permit, lw_shared_ptr sstables, @@ -1035,6 +1039,21 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); + class user_types_metadata; class keyspace_metadata final { diff --git a/table.cc b/table.cc index 30bb86ee0d..fc5ad98577 100644 --- a/table.cc +++ b/table.cc @@ -327,6 +327,32 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s, fwd_mr); } +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, From 84357f072294fd81ae19649361571564ac92ee90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 7 Jul 2020 17:00:27 +0300 Subject: [PATCH 02/10] db/view: view_updating_consumer: move implementation from table.cc to view.cc table.cc is a very counter-intuitive place for view related stuff, especially if the declarations reside in `db/view/`. --- db/view/view.cc | 14 ++++++++++++++ table.cc | 14 -------------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index b2b9dafc3d..ad28d7085e 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -58,6 +58,7 @@ #include "cql3/util.hh" #include "db/view/view.hh" #include "db/view/view_builder.hh" +#include "db/view/view_updating_consumer.hh" #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "frozen_mutation.hh" @@ -1912,5 +1913,18 @@ future check_needs_view_update_path(db::system_distributed_keyspace& sys_d }); } +stop_iteration view_updating_consumer::consume_end_of_partition() { + if (_as->abort_requested()) { + return stop_iteration::yes; + } + try { + auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get(); + } catch (...) { + vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); + } + _m.reset(); + return stop_iteration::no; +} + } // namespace view } // namespace db diff --git a/table.cc b/table.cc index fc5ad98577..fc51b4967e 100644 --- a/table.cc +++ b/table.cc @@ -23,7 +23,6 @@ #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "service/priority_manager.hh" -#include "db/view/view_updating_consumer.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" #include "mutation_fragment.hh" @@ -2305,16 +2304,3 @@ table::as_mutation_source_excluding(std::vector& ssts) return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } - -stop_iteration db::view::view_updating_consumer::consume_end_of_partition() { - if (_as->abort_requested()) { - return stop_iteration::yes; - } - try { - auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get(); - } catch (...) { - tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); - } - _m.reset(); - return stop_iteration::no; -} From 0166f970964ee688e18a5b52f8a94a5ec8831671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 7 Jul 2020 10:09:51 +0300 Subject: [PATCH 03/10] db/view: view_update_generator: make staging reader evictable The view update generation process creates two readers. One is used to read the staging sstables, the data which needs view updates to be generated for, and another reader for each processed mutation, which reads the current value (pre-image) of each row in said mutation. The staging reader is created first and is kept alive until all staging data is processed. The pre-image reader is created separately for each processed mutation. The staging reader is not restricted, meaning it does not wait for admission on the relevant reader concurrency semaphore, but it does register its resource usage on it. The pre-image reader however *is* restricted. This creates a situation, where the staging reader possibly consumes all resources from the semaphore, leaving none for the later created pre-image reader, which will not be able to start reading. This will block the view building process meaning that the staging reader will not be destroyed, causing a deadlock. This patch solves this by making the staging reader restricted and making it evictable. To prevent thrashing -- evicting the staging reader after reading only a really small partition -- we only make the staging reader evictable after we have read at least 1MB worth of data from it. --- db/view/view.cc | 39 +++++++++++++++++------ db/view/view_update_generator.cc | 19 +++++++++--- db/view/view_updating_consumer.hh | 51 ++++++++++++++++++++++++++++--- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index ad28d7085e..ac411b51e6 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1913,17 +1913,38 @@ future check_needs_view_update_path(db::system_distributed_keyspace& sys_d }); } -stop_iteration view_updating_consumer::consume_end_of_partition() { - if (_as->abort_requested()) { - return stop_iteration::yes; +const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024}; +const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024}; + +void view_updating_consumer::do_flush_buffer() { + _staging_reader_handle.pause(); + + if (_buffer.front().partition().empty()) { + // If we flushed mid-partition we can have an empty mutation if we + // flushed right before getting the end-of-partition fragment. + _buffer.pop_front(); } - try { - auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get(); - } catch (...) { - vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); + + while (!_buffer.empty()) { + try { + auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(_buffer.front()), db::no_timeout, _excluded_sstables).get(); + } catch (...) { + vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); + } + _buffer.pop_front(); + } + + _buffer_size = 0; + _m = nullptr; +} + +void view_updating_consumer::maybe_flush_buffer_mid_partition() { + if (_buffer_size >= buffer_size_hard_limit) { + auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema)); + do_flush_buffer(); + _buffer.emplace_back(std::move(m)); + _m = &_buffer.back(); } - _m.reset(); - return stop_iteration::no; } } // namespace view diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 4fdc363887..624d192e5f 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -64,18 +64,29 @@ future<> view_update_generator::start() { ssts->insert(sst); } - flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s, + auto ms = mutation_source([this, ssts] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr ts, + streamed_mutation::forwarding fwd_ms, + mutation_reader::forwarding fwd_mr) { + return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + }); + auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( + std::move(ms), + s, _db.make_query_class_config().semaphore.make_permit(), - std::move(ssts), query::full_partition_range, s->full_slice(), service::get_local_streaming_priority(), nullptr, - ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no); inject_failure("view_update_generator_consume_staging_sstable"); - auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout); + auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as, staging_sstable_reader_handle), db::no_timeout); if (result == stop_iteration::yes) { break; } diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index ee0a2bf232..e1fa7457e4 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -27,6 +27,8 @@ #include "sstables/shared_sstable.hh" #include "database.hh" +class evictable_reader_handle; + namespace db::view { /* @@ -34,22 +36,44 @@ namespace db::view { * It is expected to be run in seastar::async threaded context through consume_in_thread() */ class view_updating_consumer { +public: + // We prefer flushing on partition boundaries, so at the end of a partition, + // we flush on reaching the soft limit. Otherwise we continue accumulating + // data. We flush mid-partition if we reach the hard limit. + static const size_t buffer_size_soft_limit; + static const size_t buffer_size_hard_limit; + +private: schema_ptr _schema; lw_shared_ptr _table; std::vector _excluded_sstables; const seastar::abort_source* _as; - std::optional _m; + evictable_reader_handle& _staging_reader_handle; + circular_buffer _buffer; + mutation* _m{nullptr}; + size_t _buffer_size{0}; + +private: + void do_flush_buffer(); + void maybe_flush_buffer_mid_partition(); + public: - view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as) + view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, + evictable_reader_handle& staging_reader_handle) : _schema(std::move(schema)) , _table(table.shared_from_this()) , _excluded_sstables(std::move(excluded_sstables)) , _as(&as) - , _m() + , _staging_reader_handle(staging_reader_handle) { } + view_updating_consumer(view_updating_consumer&&) = default; + + view_updating_consumer& operator=(view_updating_consumer&&) = delete; + void consume_new_partition(const dht::decorated_key& dk) { - _m = mutation(_schema, dk, mutation_partition(_schema)); + _buffer.emplace_back(_schema, dk, mutation_partition(_schema)); + _m = &_buffer.back(); } void consume(tombstone t) { @@ -60,7 +84,9 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += sr.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(sr)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -68,7 +94,9 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += cr.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(cr)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -76,14 +104,27 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += rt.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(rt)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } // Expected to be run in seastar::async threaded context (consume_in_thread()) - stop_iteration consume_end_of_partition(); + stop_iteration consume_end_of_partition() { + if (_as->abort_requested()) { + return stop_iteration::yes; + } + if (_buffer_size >= buffer_size_soft_limit) { + do_flush_buffer(); + } + return stop_iteration::no; + } stop_iteration consume_end_of_stream() { + if (!_buffer.empty()) { + do_flush_buffer(); + } return stop_iteration(_as->abort_requested()); } }; From 566e31a5ac5c86419fde59a5d94ca0e571f876ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Jul 2020 21:13:58 +0300 Subject: [PATCH 04/10] db/view: view_updating_consumer: allow passing custom update pusher So that tests can test the `view_update_consumer` in isolation, without having to set up the whole database machinery. In addition to less infrastructure setup, this allows more direct checking of mutations pushed for view generation. --- db/view/view.cc | 11 ++++++++++- db/view/view_updating_consumer.hh | 14 ++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/db/view/view.cc b/db/view/view.cc index ac411b51e6..4b18d59f6d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -1927,7 +1927,7 @@ void view_updating_consumer::do_flush_buffer() { while (!_buffer.empty()) { try { - auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(_buffer.front()), db::no_timeout, _excluded_sstables).get(); + auto lock_holder = _view_update_pusher(std::move(_buffer.front())).get(); } catch (...) { vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); } @@ -1947,5 +1947,14 @@ void view_updating_consumer::maybe_flush_buffer_mid_partition() { } } +view_updating_consumer::view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, + evictable_reader_handle& staging_reader_handle) + : view_updating_consumer(std::move(schema), as, staging_reader_handle, + [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable { + auto s = m.schema(); + return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables); + }) +{ } + } // namespace view } // namespace db diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index e1fa7457e4..efe1a97532 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -45,28 +45,30 @@ public: private: schema_ptr _schema; - lw_shared_ptr
_table; - std::vector _excluded_sstables; const seastar::abort_source* _as; evictable_reader_handle& _staging_reader_handle; circular_buffer _buffer; mutation* _m{nullptr}; size_t _buffer_size{0}; + noncopyable_function(mutation)> _view_update_pusher; private: void do_flush_buffer(); void maybe_flush_buffer_mid_partition(); public: - view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, - evictable_reader_handle& staging_reader_handle) + // Push updates with a custom pusher. Mainly for tests. + view_updating_consumer(schema_ptr schema, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle, + noncopyable_function(mutation)> view_update_pusher) : _schema(std::move(schema)) - , _table(table.shared_from_this()) - , _excluded_sstables(std::move(excluded_sstables)) , _as(&as) , _staging_reader_handle(staging_reader_handle) + , _view_update_pusher(std::move(view_update_pusher)) { } + view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, + evictable_reader_handle& staging_reader_handle); + view_updating_consumer(view_updating_consumer&&) = default; view_updating_consumer& operator=(view_updating_consumer&&) = delete; From 5de0afdab79e96befa4fca2c45885cd6d4956c47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Jul 2020 21:14:35 +0300 Subject: [PATCH 05/10] mutation_reader: expose new_reader_base_cost So that test code can use it. --- mutation_reader.cc | 4 ++-- mutation_reader.hh | 2 ++ test/boost/mutation_reader_test.cc | 2 -- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 4fe7a4638d..f431dc1812 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -659,6 +659,8 @@ flat_mutation_reader make_combined_reader(schema_ptr schema, return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr); } +const ssize_t new_reader_base_cost{16 * 1024}; + class restricting_mutation_reader : public flat_mutation_reader::impl { struct mutation_source_and_params { mutation_source _ms; @@ -685,8 +687,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { }; std::variant _state; - static const ssize_t new_reader_base_cost{16 * 1024}; - template requires std::is_move_constructible::value && requires(Function fn, flat_mutation_reader& reader) { diff --git a/mutation_reader.hh b/mutation_reader.hh index aab6bae6ab..478e355801 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -304,6 +304,8 @@ public: mutation_source make_empty_mutation_source(); snapshot_source make_empty_snapshot_source(); +extern const ssize_t new_reader_base_cost; + // Creates a restricted reader whose resource usages will be tracked // during it's lifetime. If there are not enough resources (dues to // existing readers) to create the new reader, it's construction will diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 8ba99b3c81..6c532c990b 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -870,8 +870,6 @@ SEASTAR_TEST_CASE(reader_selector_fast_forwarding_test) { }); } -static const std::size_t new_reader_base_cost{16 * 1024}; - sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema& sschema, const sstring& path) { std::vector mutations; mutations.reserve(1 << 14); From f264d2b00fd5bf7ee8a64da6a21e17464917a351 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 28 May 2020 10:20:53 +0300 Subject: [PATCH 06/10] test: cql_test_env: allow overriding database_config --- test/lib/cql_test_env.cc | 6 +++++- test/lib/cql_test_env.hh | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index c580b2dff5..63e18894e1 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -459,7 +459,11 @@ public: auto stop_storage_service = defer([&ss] { ss.stop().get(); }); database_config dbcfg; - dbcfg.available_memory = memory::stats().total_memory(); + if (cfg_in.dbcfg) { + dbcfg = std::move(*cfg_in.dbcfg); + } else { + dbcfg.available_memory = memory::stats().total_memory(); + } db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get(); auto stop_db = defer([&db] { db.stop().get(); diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index c3f0073458..e3d22fe6c7 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -64,6 +64,7 @@ namespace db { class cql_test_config { public: seastar::shared_ptr db_config; + std::optional dbcfg; std::set disabled_features; cql_test_config(); From aabbdc34ac233b18e91849f7fa74d05ffe3edf73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 10 Jul 2020 12:09:17 +0300 Subject: [PATCH 07/10] reader_concurrency_semaphore: add initial_resources() To allow tests to reliably calculate the amount of resources they need to consume in order to effectively reduce the resources of the semaphore to a desired amount. Using `available_resources()` is not reliable as it doesn't factor in resources that are consumed at the moment but will be returned later. This will also benefit debugging coredumps where we will now be able to tell how much resources the semaphore was created with and this calculate the amount of memory and count currently used. --- reader_concurrency_semaphore.hh | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 89a6fea969..b4e34f2ae6 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -105,6 +105,7 @@ private: }; private: + const resources _initial_resources; resources _resources; expiring_fifo _wait_list; @@ -135,7 +136,8 @@ public: sstring name, size_t max_queue_length = std::numeric_limits::max(), std::function prethrow_action = nullptr) - : _resources(count, memory) + : _initial_resources(count, memory) + , _resources(count, memory) , _wait_list(expiry_handler(name)) , _name(std::move(name)) , _max_queue_length(max_queue_length) @@ -193,6 +195,10 @@ public: reader_permit make_permit(); + const resources initial_resources() const { + return _initial_resources; + } + const resources available_resources() const { return _resources; } From e5db1ce7851b17d598178fd91bf3be00d3a056a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 10 Jul 2020 12:10:20 +0300 Subject: [PATCH 08/10] reader_permit: reader_resources: add operator- and operator+ In addition to the already available operator+= and operator-=. --- reader_permit.hh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/reader_permit.hh b/reader_permit.hh index edb94b998b..2436e80520 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -42,12 +42,20 @@ struct reader_resources { return count >= other.count && memory >= other.memory; } + reader_resources operator-(const reader_resources& other) const { + return reader_resources{count - other.count, memory - other.memory}; + } + reader_resources& operator-=(const reader_resources& other) { count -= other.count; memory -= other.memory; return *this; } + reader_resources operator+(const reader_resources& other) const { + return reader_resources{count + other.count, memory + other.memory}; + } + reader_resources& operator+=(const reader_resources& other) { count += other.count; memory += other.memory; From e316796b3fcb3c502ed4a2bde8d0a216ffe8b465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 20 Jul 2020 12:53:05 +0300 Subject: [PATCH 09/10] test/boost: view_build_test: add test test_view_update_generator_deadlock A test case which reproduces the view update generator hang, where the staging reader consumes all resources and leaves none for the pre-image reader which blocks on the semaphore indefinitely. --- test/boost/view_build_test.cc | 77 +++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 46556c3733..c6866f3f90 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -27,12 +27,15 @@ #include "db/config.hh" #include +#include #include "test/lib/cql_test_env.hh" #include "test/lib/cql_assertions.hh" #include "test/lib/sstable_utils.hh" #include "schema_builder.hh" #include "service/priority_manager.hh" #include "test/lib/test_services.hh" +#include "test/lib/data_model.hh" +#include "test/lib/log.hh" using namespace std::literals::chrono_literals; @@ -495,3 +498,77 @@ SEASTAR_TEST_CASE(test_view_update_generator) { BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size); }); } + +SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { + cql_test_config test_cfg; + auto& db_cfg = *test_cfg.db_config; + + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + + test_cfg.dbcfg.emplace(); + test_cfg.dbcfg->available_memory = memory::stats().total_memory(); + test_cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); + test_cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0(); + + do_with_cql_env([] (cql_test_env& e) -> future<> { + e.execute_cql("create table t (p text, c text, v text, primary key (p, c))").get(); + e.execute_cql("create materialized view tv as select * from t " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + auto msb = e.local_db().get_config().murmur3_partitioner_ignore_msb_bits(); + auto key1 = token_generation_for_shard(1, this_shard_id(), msb).front().first; + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(fmt::format("insert into t (p, c, v) values ('{}', 'c{}', 'x')", key1, i)).get(); + } + + // We need data on the disk so that the pre-image reader is forced to go to disk. + e.db().invoke_on_all([] (database& db) { + return db.flush_all_memtables(); + }).get(); + + auto& view_update_generator = e.local_view_update_generator(); + auto s = test_table_schema(); + + lw_shared_ptr
t = e.local_db().find_column_family("ks", "t").shared_from_this(); + + auto key = partition_key::from_exploded(*s, {to_bytes(key1)}); + mutation m(s, key); + auto col = s->get_column_definition("v"); + const auto filler_val_size = 4 * 1024; + const auto filler_val = sstring(filler_val_size, 'a'); + for (int i = 0; i < 1024; ++i) { + auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))})); + row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(filler_val))); + } + + auto sst = t->make_streaming_staging_sstable(); + sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer(); + auto& pc = service::get_local_streaming_priority(); + + sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get(); + sst->open_data().get(); + t->add_sstable_and_update_cache(sst).get(); + + auto& sem = *with_scheduling_group(e.local_db().get_streaming_scheduling_group(), [&] () { + return &e.local_db().make_query_class_config().semaphore; + }).get0(); + + // consume all units except what is needed to admit a single reader. + sem.consume(sem.initial_resources() - reader_resources{1, new_reader_base_cost}); + + testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); + + BOOST_REQUIRE_EQUAL(sem.get_inactive_read_stats().permit_based_evictions, 0); + + view_update_generator.register_staging_sstable(sst, t).get(); + + eventually_true([&] { + return sem.get_inactive_read_stats().permit_based_evictions > 0; + }); + + return make_ready_future<>(); + }, std::move(test_cfg)).get(); +} From 929cdd3a15dacda6635a1e1bb3116dde73be7d88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 9 Jul 2020 21:15:12 +0300 Subject: [PATCH 10/10] test/boost: view_build_test: add test_view_update_generator_buffering To exercise the new buffering and pausing logic of the view updating consumer. --- test/boost/view_build_test.cc | 181 ++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index c6866f3f90..8c91b5217d 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -36,6 +36,7 @@ #include "test/lib/test_services.hh" #include "test/lib/data_model.hh" #include "test/lib/log.hh" +#include "utils/ranges.hh" using namespace std::literals::chrono_literals; @@ -572,3 +573,183 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { return make_ready_future<>(); }, std::move(test_cfg)).get(); } + +SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { + using partition_size_map = std::map; + + class consumer_verifier { + schema_ptr _schema; + reader_permit _permit; + const partition_size_map& _partition_rows; + std::vector& _collected_muts; + bool& _failed; + std::unique_ptr _rl; + std::unique_ptr _rl_stats; + clustering_key::less_compare _less_cmp; + const size_t _max_rows_soft; + const size_t _max_rows_hard; + size_t _buffer_rows = 0; + + private: + static size_t rows_in_limit(size_t l) { + const size_t _100kb = 100 * 1024; + // round up + return l / _100kb + std::min(size_t(1), l % _100kb); + } + + static size_t rows_in_mut(const mutation& m) { + return std::distance(m.partition().clustered_rows().begin(), m.partition().clustered_rows().end()); + } + + void check(mutation mut) { + // First we check that we would be able to create a reader, even + // though the staging reader consumed all resources. + auto fut = _permit.wait_admission(new_reader_base_cost, db::timeout_clock::now()); + BOOST_REQUIRE(!fut.failed()); + auto res_units = fut.get0(); + + const size_t current_rows = rows_in_mut(mut); + const auto total_rows = _partition_rows.at(mut.decorated_key()); + _buffer_rows += current_rows; + + testlog.trace("consumer_verifier::check(): key={}, rows={}/{}, _buffer={}", + partition_key::with_schema_wrapper(*_schema, mut.key()), + current_rows, + total_rows, + _buffer_rows); + + BOOST_REQUIRE(current_rows); + BOOST_REQUIRE(current_rows <= _max_rows_hard); + BOOST_REQUIRE(_buffer_rows <= _max_rows_hard); + + // The current partition doesn't have all of its rows yet, verify + // that the new mutation contains the next rows for the same + // partition + if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) { + BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())); + const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key(); + const auto& next_ckey = mut.partition().clustered_rows().begin()->key(); + BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey)); + mutation_application_stats stats; + _collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats); + // The new mutation is a new partition. + } else { + if (!_collected_muts.empty()) { + BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())); + } + _collected_muts.push_back(std::move(mut)); + } + + if (_buffer_rows >= _max_rows_hard) { // buffer flushed on hard limit + _buffer_rows = 0; + testlog.trace("consumer_verifier::check(): buffer ends on hard limit"); + } else if (_buffer_rows >= _max_rows_soft) { // buffer flushed on soft limit + _buffer_rows = 0; + testlog.trace("consumer_verifier::check(): buffer ends on soft limit"); + } + } + + public: + consumer_verifier(schema_ptr schema, reader_permit permit, const partition_size_map& partition_rows, std::vector& collected_muts, bool& failed) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _partition_rows(partition_rows) + , _collected_muts(collected_muts) + , _failed(failed) + , _rl(std::make_unique(_schema)) + , _rl_stats(std::make_unique()) + , _less_cmp(*_schema) + , _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit)) + , _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit)) + { } + + future operator()(mutation mut) { + try { + check(std::move(mut)); + } catch (...) { + testlog.error("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception()); + _failed |= true; + } + return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats); + } + }; + + reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name()); + + auto schema = schema_builder("ks", "cf") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("v", bytes_type) + .build(); + + const auto blob_100kb = bytes(100 * 1024, bytes::value_type(0xab)); + const abort_source as; + + const auto partition_size_sets = std::vector>{{12}, {8, 4}, {8, 16}, {22}, {8, 8, 8, 8}, {8, 8, 8, 16, 8}, {8, 20, 16, 16}, {50}, {21}, {21, 2}}; + const auto max_partition_set_size = std::ranges::max_element(partition_size_sets, [] (const std::vector& a, const std::vector& b) { return a.size() < b.size(); })->size(); + auto pkeys = ranges::to>(std::views::iota(size_t{0}, max_partition_set_size) | std::views::transform([schema] (int i) { + return dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(data_value(i)))); + })); + std::ranges::sort(pkeys, dht::ring_position_less_comparator(*schema)); + + for (auto partition_sizes_100kb : partition_size_sets) { + testlog.debug("partition_sizes_100kb={}", partition_sizes_100kb); + partition_size_map partition_rows{dht::ring_position_less_comparator(*schema)}; + std::vector muts; + auto pk = 0; + for (auto partition_size_100kb : partition_sizes_100kb) { + auto mut_desc = tests::data_model::mutation_description(pkeys.at(pk++).key().explode(*schema)); + for (auto ck = 0; ck < partition_size_100kb; ++ck) { + mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb)); + } + muts.push_back(mut_desc.build(schema)); + partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb); + } + + std::ranges::sort(muts, [less = dht::ring_position_less_comparator(*schema)] (const mutation& a, const mutation& b) { + return less(a.decorated_key(), b.decorated_key()); + }); + + auto permit = sem.make_permit(); + + auto mt = make_lw_shared(schema); + for (const auto& mut : muts) { + mt->apply(mut); + } + + auto ms = mutation_source([mt] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr ts, + streamed_mutation::forwarding fwd_ms, + mutation_reader::forwarding fwd_mr) { + return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + }); + auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader( + std::move(ms), + schema, + permit, + query::full_partition_range, + schema->full_slice(), + service::get_local_streaming_priority(), + nullptr, + ::mutation_reader::forwarding::no); + + std::vector collected_muts; + bool failed = false; + + staging_reader.consume_in_thread(db::view::view_updating_consumer(schema, as, staging_reader_handle, + consumer_verifier(schema, permit, partition_rows, collected_muts, failed)), db::no_timeout); + + BOOST_REQUIRE(!failed); + + BOOST_REQUIRE_EQUAL(muts.size(), collected_muts.size()); + for (size_t i = 0; i < muts.size(); ++i) { + testlog.trace("compare mutation {}", i); + BOOST_REQUIRE_EQUAL(muts[i], collected_muts[i]); + } + } +}