diff --git a/database.hh b/database.hh index 15bdc19917..d0fb002b74 100644 --- a/database.hh +++ b/database.hh @@ -1024,6 +1024,10 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); +/// Read a range from the passed-in sstables. +/// +/// The reader is unrestricted, but will account its resource usage on the +/// semaphore belonging to the passed-in permit. flat_mutation_reader make_range_sstable_reader(schema_ptr s, reader_permit permit, lw_shared_ptr sstables, @@ -1035,6 +1039,21 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s, mutation_reader::forwarding fwd_mr, sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); +/// Read a range from the passed-in sstables. +/// +/// The reader is restricted, that is it will wait for admission on the semaphore +/// belonging to the passed-in permit, before starting to read. +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator()); + class user_types_metadata; class keyspace_metadata final { diff --git a/db/view/view.cc b/db/view/view.cc index b2b9dafc3d..4b18d59f6d 100644 --- a/db/view/view.cc +++ b/db/view/view.cc @@ -58,6 +58,7 @@ #include "cql3/util.hh" #include "db/view/view.hh" #include "db/view/view_builder.hh" +#include "db/view/view_updating_consumer.hh" #include "db/system_keyspace_view_types.hh" #include "db/system_keyspace.hh" #include "frozen_mutation.hh" @@ -1912,5 +1913,48 @@ future check_needs_view_update_path(db::system_distributed_keyspace& sys_d }); } +const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024}; +const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024}; + +void view_updating_consumer::do_flush_buffer() { + _staging_reader_handle.pause(); + + if (_buffer.front().partition().empty()) { + // If we flushed mid-partition we can have an empty mutation if we + // flushed right before getting the end-of-partition fragment. + _buffer.pop_front(); + } + + while (!_buffer.empty()) { + try { + auto lock_holder = _view_update_pusher(std::move(_buffer.front())).get(); + } catch (...) { + vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); + } + _buffer.pop_front(); + } + + _buffer_size = 0; + _m = nullptr; +} + +void view_updating_consumer::maybe_flush_buffer_mid_partition() { + if (_buffer_size >= buffer_size_hard_limit) { + auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema)); + do_flush_buffer(); + _buffer.emplace_back(std::move(m)); + _m = &_buffer.back(); + } +} + +view_updating_consumer::view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, + evictable_reader_handle& staging_reader_handle) + : view_updating_consumer(std::move(schema), as, staging_reader_handle, + [table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable { + auto s = m.schema(); + return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables); + }) +{ } + } // namespace view } // namespace db diff --git a/db/view/view_update_generator.cc b/db/view/view_update_generator.cc index 4fdc363887..624d192e5f 100644 --- a/db/view/view_update_generator.cc +++ b/db/view/view_update_generator.cc @@ -64,18 +64,29 @@ future<> view_update_generator::start() { ssts->insert(sst); } - flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s, + auto ms = mutation_source([this, ssts] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr ts, + streamed_mutation::forwarding fwd_ms, + mutation_reader::forwarding fwd_mr) { + return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + }); + auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader( + std::move(ms), + s, _db.make_query_class_config().semaphore.make_permit(), - std::move(ssts), query::full_partition_range, s->full_slice(), service::get_local_streaming_priority(), nullptr, - ::streamed_mutation::forwarding::no, ::mutation_reader::forwarding::no); inject_failure("view_update_generator_consume_staging_sstable"); - auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout); + auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as, staging_sstable_reader_handle), db::no_timeout); if (result == stop_iteration::yes) { break; } diff --git a/db/view/view_updating_consumer.hh b/db/view/view_updating_consumer.hh index ee0a2bf232..efe1a97532 100644 --- a/db/view/view_updating_consumer.hh +++ b/db/view/view_updating_consumer.hh @@ -27,6 +27,8 @@ #include "sstables/shared_sstable.hh" #include "database.hh" +class evictable_reader_handle; + namespace db::view { /* @@ -34,22 +36,46 @@ namespace db::view { * It is expected to be run in seastar::async threaded context through consume_in_thread() */ class view_updating_consumer { - schema_ptr _schema; - lw_shared_ptr _table; - std::vector _excluded_sstables; - const seastar::abort_source* _as; - std::optional _m; public: - view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as) + // We prefer flushing on partition boundaries, so at the end of a partition, + // we flush on reaching the soft limit. Otherwise we continue accumulating + // data. We flush mid-partition if we reach the hard limit. + static const size_t buffer_size_soft_limit; + static const size_t buffer_size_hard_limit; + +private: + schema_ptr _schema; + const seastar::abort_source* _as; + evictable_reader_handle& _staging_reader_handle; + circular_buffer _buffer; + mutation* _m{nullptr}; + size_t _buffer_size{0}; + noncopyable_function(mutation)> _view_update_pusher; + +private: + void do_flush_buffer(); + void maybe_flush_buffer_mid_partition(); + +public: + // Push updates with a custom pusher. Mainly for tests. + view_updating_consumer(schema_ptr schema, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle, + noncopyable_function(mutation)> view_update_pusher) : _schema(std::move(schema)) - , _table(table.shared_from_this()) - , _excluded_sstables(std::move(excluded_sstables)) , _as(&as) - , _m() + , _staging_reader_handle(staging_reader_handle) + , _view_update_pusher(std::move(view_update_pusher)) { } + view_updating_consumer(schema_ptr schema, table& table, std::vector excluded_sstables, const seastar::abort_source& as, + evictable_reader_handle& staging_reader_handle); + + view_updating_consumer(view_updating_consumer&&) = default; + + view_updating_consumer& operator=(view_updating_consumer&&) = delete; + void consume_new_partition(const dht::decorated_key& dk) { - _m = mutation(_schema, dk, mutation_partition(_schema)); + _buffer.emplace_back(_schema, dk, mutation_partition(_schema)); + _m = &_buffer.back(); } void consume(tombstone t) { @@ -60,7 +86,9 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += sr.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(sr)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -68,7 +96,9 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += cr.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(cr)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } @@ -76,14 +106,27 @@ public: if (_as->abort_requested()) { return stop_iteration::yes; } + _buffer_size += rt.memory_usage(*_schema); _m->partition().apply(*_schema, std::move(rt)); + maybe_flush_buffer_mid_partition(); return stop_iteration::no; } // Expected to be run in seastar::async threaded context (consume_in_thread()) - stop_iteration consume_end_of_partition(); + stop_iteration consume_end_of_partition() { + if (_as->abort_requested()) { + return stop_iteration::yes; + } + if (_buffer_size >= buffer_size_soft_limit) { + do_flush_buffer(); + } + return stop_iteration::no; + } stop_iteration consume_end_of_stream() { + if (!_buffer.empty()) { + do_flush_buffer(); + } return stop_iteration(_as->abort_requested()); } }; diff --git a/mutation_reader.cc b/mutation_reader.cc index 4fe7a4638d..f431dc1812 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -659,6 +659,8 @@ flat_mutation_reader make_combined_reader(schema_ptr schema, return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr); } +const ssize_t new_reader_base_cost{16 * 1024}; + class restricting_mutation_reader : public flat_mutation_reader::impl { struct mutation_source_and_params { mutation_source _ms; @@ -685,8 +687,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl { }; std::variant _state; - static const ssize_t new_reader_base_cost{16 * 1024}; - template requires std::is_move_constructible::value && requires(Function fn, flat_mutation_reader& reader) { diff --git a/mutation_reader.hh b/mutation_reader.hh index aab6bae6ab..478e355801 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -304,6 +304,8 @@ public: mutation_source make_empty_mutation_source(); snapshot_source make_empty_snapshot_source(); +extern const ssize_t new_reader_base_cost; + // Creates a restricted reader whose resource usages will be tracked // during it's lifetime. If there are not enough resources (dues to // existing readers) to create the new reader, it's construction will diff --git a/reader_concurrency_semaphore.hh b/reader_concurrency_semaphore.hh index 89a6fea969..b4e34f2ae6 100644 --- a/reader_concurrency_semaphore.hh +++ b/reader_concurrency_semaphore.hh @@ -105,6 +105,7 @@ private: }; private: + const resources _initial_resources; resources _resources; expiring_fifo _wait_list; @@ -135,7 +136,8 @@ public: sstring name, size_t max_queue_length = std::numeric_limits::max(), std::function prethrow_action = nullptr) - : _resources(count, memory) + : _initial_resources(count, memory) + , _resources(count, memory) , _wait_list(expiry_handler(name)) , _name(std::move(name)) , _max_queue_length(max_queue_length) @@ -193,6 +195,10 @@ public: reader_permit make_permit(); + const resources initial_resources() const { + return _initial_resources; + } + const resources available_resources() const { return _resources; } diff --git a/reader_permit.hh b/reader_permit.hh index edb94b998b..2436e80520 100644 --- a/reader_permit.hh +++ b/reader_permit.hh @@ -42,12 +42,20 @@ struct reader_resources { return count >= other.count && memory >= other.memory; } + reader_resources operator-(const reader_resources& other) const { + return reader_resources{count - other.count, memory - other.memory}; + } + reader_resources& operator-=(const reader_resources& other) { count -= other.count; memory -= other.memory; return *this; } + reader_resources operator+(const reader_resources& other) const { + return reader_resources{count + other.count, memory + other.memory}; + } + reader_resources& operator+=(const reader_resources& other) { count += other.count; memory += other.memory; diff --git a/table.cc b/table.cc index 30bb86ee0d..fc51b4967e 100644 --- a/table.cc +++ b/table.cc @@ -23,7 +23,6 @@ #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include "service/priority_manager.hh" -#include "db/view/view_updating_consumer.hh" #include "db/schema_tables.hh" #include "cell_locking.hh" #include "mutation_fragment.hh" @@ -327,6 +326,32 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s, fwd_mr); } +flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s, + reader_permit permit, + lw_shared_ptr sstables, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr, + sstables::read_monitor_generator& monitor_generator) +{ + auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd, + mutation_reader::forwarding fwd_mr) { + return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc, + std::move(trace_state), fwd, fwd_mr, monitor_generator); + }); + return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr); +} + flat_mutation_reader table::make_sstable_reader(schema_ptr s, reader_permit permit, @@ -2279,16 +2304,3 @@ table::as_mutation_source_excluding(std::vector& ssts) return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr); }); } - -stop_iteration db::view::view_updating_consumer::consume_end_of_partition() { - if (_as->abort_requested()) { - return stop_iteration::yes; - } - try { - auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get(); - } catch (...) { - tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception()); - } - _m.reset(); - return stop_iteration::no; -} diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 8ba99b3c81..6c532c990b 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -870,8 +870,6 @@ SEASTAR_TEST_CASE(reader_selector_fast_forwarding_test) { }); } -static const std::size_t new_reader_base_cost{16 * 1024}; - sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema& sschema, const sstring& path) { std::vector mutations; mutations.reserve(1 << 14); diff --git a/test/boost/view_build_test.cc b/test/boost/view_build_test.cc index 46556c3733..8c91b5217d 100644 --- a/test/boost/view_build_test.cc +++ b/test/boost/view_build_test.cc @@ -27,12 +27,16 @@ #include "db/config.hh" #include +#include #include "test/lib/cql_test_env.hh" #include "test/lib/cql_assertions.hh" #include "test/lib/sstable_utils.hh" #include "schema_builder.hh" #include "service/priority_manager.hh" #include "test/lib/test_services.hh" +#include "test/lib/data_model.hh" +#include "test/lib/log.hh" +#include "utils/ranges.hh" using namespace std::literals::chrono_literals; @@ -495,3 +499,257 @@ SEASTAR_TEST_CASE(test_view_update_generator) { BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size); }); } + +SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) { + cql_test_config test_cfg; + auto& db_cfg = *test_cfg.db_config; + + db_cfg.enable_cache(false); + db_cfg.enable_commitlog(false); + + test_cfg.dbcfg.emplace(); + test_cfg.dbcfg->available_memory = memory::stats().total_memory(); + test_cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0(); + test_cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0(); + + do_with_cql_env([] (cql_test_env& e) -> future<> { + e.execute_cql("create table t (p text, c text, v text, primary key (p, c))").get(); + e.execute_cql("create materialized view tv as select * from t " + "where p is not null and c is not null and v is not null " + "primary key (v, c, p)").get(); + + auto msb = e.local_db().get_config().murmur3_partitioner_ignore_msb_bits(); + auto key1 = token_generation_for_shard(1, this_shard_id(), msb).front().first; + + for (auto i = 0; i < 1024; ++i) { + e.execute_cql(fmt::format("insert into t (p, c, v) values ('{}', 'c{}', 'x')", key1, i)).get(); + } + + // We need data on the disk so that the pre-image reader is forced to go to disk. + e.db().invoke_on_all([] (database& db) { + return db.flush_all_memtables(); + }).get(); + + auto& view_update_generator = e.local_view_update_generator(); + auto s = test_table_schema(); + + lw_shared_ptr
t = e.local_db().find_column_family("ks", "t").shared_from_this(); + + auto key = partition_key::from_exploded(*s, {to_bytes(key1)}); + mutation m(s, key); + auto col = s->get_column_definition("v"); + const auto filler_val_size = 4 * 1024; + const auto filler_val = sstring(filler_val_size, 'a'); + for (int i = 0; i < 1024; ++i) { + auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))})); + row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(filler_val))); + } + + auto sst = t->make_streaming_staging_sstable(); + sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer(); + auto& pc = service::get_local_streaming_priority(); + + sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get(); + sst->open_data().get(); + t->add_sstable_and_update_cache(sst).get(); + + auto& sem = *with_scheduling_group(e.local_db().get_streaming_scheduling_group(), [&] () { + return &e.local_db().make_query_class_config().semaphore; + }).get0(); + + // consume all units except what is needed to admit a single reader. + sem.consume(sem.initial_resources() - reader_resources{1, new_reader_base_cost}); + + testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory); + + BOOST_REQUIRE_EQUAL(sem.get_inactive_read_stats().permit_based_evictions, 0); + + view_update_generator.register_staging_sstable(sst, t).get(); + + eventually_true([&] { + return sem.get_inactive_read_stats().permit_based_evictions > 0; + }); + + return make_ready_future<>(); + }, std::move(test_cfg)).get(); +} + +SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) { + using partition_size_map = std::map; + + class consumer_verifier { + schema_ptr _schema; + reader_permit _permit; + const partition_size_map& _partition_rows; + std::vector& _collected_muts; + bool& _failed; + std::unique_ptr _rl; + std::unique_ptr _rl_stats; + clustering_key::less_compare _less_cmp; + const size_t _max_rows_soft; + const size_t _max_rows_hard; + size_t _buffer_rows = 0; + + private: + static size_t rows_in_limit(size_t l) { + const size_t _100kb = 100 * 1024; + // round up + return l / _100kb + std::min(size_t(1), l % _100kb); + } + + static size_t rows_in_mut(const mutation& m) { + return std::distance(m.partition().clustered_rows().begin(), m.partition().clustered_rows().end()); + } + + void check(mutation mut) { + // First we check that we would be able to create a reader, even + // though the staging reader consumed all resources. + auto fut = _permit.wait_admission(new_reader_base_cost, db::timeout_clock::now()); + BOOST_REQUIRE(!fut.failed()); + auto res_units = fut.get0(); + + const size_t current_rows = rows_in_mut(mut); + const auto total_rows = _partition_rows.at(mut.decorated_key()); + _buffer_rows += current_rows; + + testlog.trace("consumer_verifier::check(): key={}, rows={}/{}, _buffer={}", + partition_key::with_schema_wrapper(*_schema, mut.key()), + current_rows, + total_rows, + _buffer_rows); + + BOOST_REQUIRE(current_rows); + BOOST_REQUIRE(current_rows <= _max_rows_hard); + BOOST_REQUIRE(_buffer_rows <= _max_rows_hard); + + // The current partition doesn't have all of its rows yet, verify + // that the new mutation contains the next rows for the same + // partition + if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) { + BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())); + const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key(); + const auto& next_ckey = mut.partition().clustered_rows().begin()->key(); + BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey)); + mutation_application_stats stats; + _collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats); + // The new mutation is a new partition. + } else { + if (!_collected_muts.empty()) { + BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key())); + } + _collected_muts.push_back(std::move(mut)); + } + + if (_buffer_rows >= _max_rows_hard) { // buffer flushed on hard limit + _buffer_rows = 0; + testlog.trace("consumer_verifier::check(): buffer ends on hard limit"); + } else if (_buffer_rows >= _max_rows_soft) { // buffer flushed on soft limit + _buffer_rows = 0; + testlog.trace("consumer_verifier::check(): buffer ends on soft limit"); + } + } + + public: + consumer_verifier(schema_ptr schema, reader_permit permit, const partition_size_map& partition_rows, std::vector& collected_muts, bool& failed) + : _schema(std::move(schema)) + , _permit(std::move(permit)) + , _partition_rows(partition_rows) + , _collected_muts(collected_muts) + , _failed(failed) + , _rl(std::make_unique(_schema)) + , _rl_stats(std::make_unique()) + , _less_cmp(*_schema) + , _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit)) + , _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit)) + { } + + future operator()(mutation mut) { + try { + check(std::move(mut)); + } catch (...) { + testlog.error("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception()); + _failed |= true; + } + return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats); + } + }; + + reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name()); + + auto schema = schema_builder("ks", "cf") + .with_column("pk", int32_type, column_kind::partition_key) + .with_column("ck", int32_type, column_kind::clustering_key) + .with_column("v", bytes_type) + .build(); + + const auto blob_100kb = bytes(100 * 1024, bytes::value_type(0xab)); + const abort_source as; + + const auto partition_size_sets = std::vector>{{12}, {8, 4}, {8, 16}, {22}, {8, 8, 8, 8}, {8, 8, 8, 16, 8}, {8, 20, 16, 16}, {50}, {21}, {21, 2}}; + const auto max_partition_set_size = std::ranges::max_element(partition_size_sets, [] (const std::vector& a, const std::vector& b) { return a.size() < b.size(); })->size(); + auto pkeys = ranges::to>(std::views::iota(size_t{0}, max_partition_set_size) | std::views::transform([schema] (int i) { + return dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(data_value(i)))); + })); + std::ranges::sort(pkeys, dht::ring_position_less_comparator(*schema)); + + for (auto partition_sizes_100kb : partition_size_sets) { + testlog.debug("partition_sizes_100kb={}", partition_sizes_100kb); + partition_size_map partition_rows{dht::ring_position_less_comparator(*schema)}; + std::vector muts; + auto pk = 0; + for (auto partition_size_100kb : partition_sizes_100kb) { + auto mut_desc = tests::data_model::mutation_description(pkeys.at(pk++).key().explode(*schema)); + for (auto ck = 0; ck < partition_size_100kb; ++ck) { + mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb)); + } + muts.push_back(mut_desc.build(schema)); + partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb); + } + + std::ranges::sort(muts, [less = dht::ring_position_less_comparator(*schema)] (const mutation& a, const mutation& b) { + return less(a.decorated_key(), b.decorated_key()); + }); + + auto permit = sem.make_permit(); + + auto mt = make_lw_shared(schema); + for (const auto& mut : muts) { + mt->apply(mut); + } + + auto ms = mutation_source([mt] ( + schema_ptr s, + reader_permit permit, + const dht::partition_range& pr, + const query::partition_slice& ps, + const io_priority_class& pc, + tracing::trace_state_ptr ts, + streamed_mutation::forwarding fwd_ms, + mutation_reader::forwarding fwd_mr) { + return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr); + }); + auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader( + std::move(ms), + schema, + permit, + query::full_partition_range, + schema->full_slice(), + service::get_local_streaming_priority(), + nullptr, + ::mutation_reader::forwarding::no); + + std::vector collected_muts; + bool failed = false; + + staging_reader.consume_in_thread(db::view::view_updating_consumer(schema, as, staging_reader_handle, + consumer_verifier(schema, permit, partition_rows, collected_muts, failed)), db::no_timeout); + + BOOST_REQUIRE(!failed); + + BOOST_REQUIRE_EQUAL(muts.size(), collected_muts.size()); + for (size_t i = 0; i < muts.size(); ++i) { + testlog.trace("compare mutation {}", i); + BOOST_REQUIRE_EQUAL(muts[i], collected_muts[i]); + } + } +} diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index c580b2dff5..63e18894e1 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -459,7 +459,11 @@ public: auto stop_storage_service = defer([&ss] { ss.stop().get(); }); database_config dbcfg; - dbcfg.available_memory = memory::stats().total_memory(); + if (cfg_in.dbcfg) { + dbcfg = std::move(*cfg_in.dbcfg); + } else { + dbcfg.available_memory = memory::stats().total_memory(); + } db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get(); auto stop_db = defer([&db] { db.stop().get(); diff --git a/test/lib/cql_test_env.hh b/test/lib/cql_test_env.hh index c3f0073458..e3d22fe6c7 100644 --- a/test/lib/cql_test_env.hh +++ b/test/lib/cql_test_env.hh @@ -64,6 +64,7 @@ namespace db { class cql_test_config { public: seastar::shared_ptr db_config; + std::optional dbcfg; std::set disabled_features; cql_test_config();