merge: db/view: view_update_generator: make staging reader evictable
Merged patch set by Botond Dénes: The view update generation process creates two readers. One is used to read the staging sstables, the data which needs view updates to be generated for, and another reader for each processed mutation, which reads the current value (pre-image) of each row in said mutation. The staging reader is created first and is kept alive until all staging data is processed. The pre-image reader is created separately for each processed mutation. The staging reader is not restricted, meaning it does not wait for admission on the relevant reader concurrency semaphore, but it does register its resource usage on it. The pre-image reader however *is* restricted. This creates a situation, where the staging reader possibly consumes all resources from the semaphore, leaving none for the later created pre-image reader, which will not be able to start reading. This will block the view building process meaning that the staging reader will not be destroyed, causing a deadlock. This patch solves this by making the staging reader restricted and making it evictable. To prevent thrashing -- evicting the staging reader after reading only a really small partition -- we only make the staging reader evictable after we have read at least 1MB worth of data from it. test/boost: view_build_test: add test_view_update_generator_buffering test/boost: view_build_test: add test test_view_update_generator_deadlock reader_permit: reader_resources: add operator- and operator+ reader_concurrency_semaphore: add initial_resources() test: cql_test_env: allow overriding database_config mutation_reader: expose new_reader_base_cost db/view: view_updating_consumer: allow passing custom update pusher db/view: view_update_generator: make staging reader evictable db/view: view_updating_consumer: move implementation from table.cc to view.cc database: add make_restricted_range_sstable_reader() Signed-off-by: Botond Dénes <bdenes@scylladb.com> --- db/view/view_updating_consumer.hh | 51 ++++++++++++++++++++++++++++--- db/view/view.cc | 39 +++++++++++++++++------ db/view/view_update_generator.cc | 19 +++++++++--- 3 files changed, 91 insertions(+), 18 deletions(-)
This commit is contained in:
19
database.hh
19
database.hh
@@ -1024,6 +1024,10 @@ flat_mutation_reader make_local_shard_sstable_reader(schema_ptr s,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is unrestricted, but will account its resource usage on the
|
||||
/// semaphore belonging to the passed-in permit.
|
||||
flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
@@ -1035,6 +1039,21 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
/// Read a range from the passed-in sstables.
|
||||
///
|
||||
/// The reader is restricted, that is it will wait for admission on the semaphore
|
||||
/// belonging to the passed-in permit, before starting to read.
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator = sstables::default_read_monitor_generator());
|
||||
|
||||
class user_types_metadata;
|
||||
|
||||
class keyspace_metadata final {
|
||||
|
||||
@@ -58,6 +58,7 @@
|
||||
#include "cql3/util.hh"
|
||||
#include "db/view/view.hh"
|
||||
#include "db/view/view_builder.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "db/system_keyspace_view_types.hh"
|
||||
#include "db/system_keyspace.hh"
|
||||
#include "frozen_mutation.hh"
|
||||
@@ -1912,5 +1913,48 @@ future<bool> check_needs_view_update_path(db::system_distributed_keyspace& sys_d
|
||||
});
|
||||
}
|
||||
|
||||
const size_t view_updating_consumer::buffer_size_soft_limit{1 * 1024 * 1024};
|
||||
const size_t view_updating_consumer::buffer_size_hard_limit{2 * 1024 * 1024};
|
||||
|
||||
void view_updating_consumer::do_flush_buffer() {
|
||||
_staging_reader_handle.pause();
|
||||
|
||||
if (_buffer.front().partition().empty()) {
|
||||
// If we flushed mid-partition we can have an empty mutation if we
|
||||
// flushed right before getting the end-of-partition fragment.
|
||||
_buffer.pop_front();
|
||||
}
|
||||
|
||||
while (!_buffer.empty()) {
|
||||
try {
|
||||
auto lock_holder = _view_update_pusher(std::move(_buffer.front())).get();
|
||||
} catch (...) {
|
||||
vlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
||||
}
|
||||
_buffer.pop_front();
|
||||
}
|
||||
|
||||
_buffer_size = 0;
|
||||
_m = nullptr;
|
||||
}
|
||||
|
||||
void view_updating_consumer::maybe_flush_buffer_mid_partition() {
|
||||
if (_buffer_size >= buffer_size_hard_limit) {
|
||||
auto m = mutation(_schema, _m->decorated_key(), mutation_partition(_schema));
|
||||
do_flush_buffer();
|
||||
_buffer.emplace_back(std::move(m));
|
||||
_m = &_buffer.back();
|
||||
}
|
||||
}
|
||||
|
||||
view_updating_consumer::view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
|
||||
evictable_reader_handle& staging_reader_handle)
|
||||
: view_updating_consumer(std::move(schema), as, staging_reader_handle,
|
||||
[table = table.shared_from_this(), excluded_sstables = std::move(excluded_sstables)] (mutation m) mutable {
|
||||
auto s = m.schema();
|
||||
return table->stream_view_replica_updates(std::move(s), std::move(m), db::no_timeout, excluded_sstables);
|
||||
})
|
||||
{ }
|
||||
|
||||
} // namespace view
|
||||
} // namespace db
|
||||
|
||||
@@ -64,18 +64,29 @@ future<> view_update_generator::start() {
|
||||
ssts->insert(sst);
|
||||
}
|
||||
|
||||
flat_mutation_reader staging_sstable_reader = ::make_range_sstable_reader(s,
|
||||
auto ms = mutation_source([this, ssts] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return ::make_restricted_range_sstable_reader(s, std::move(permit), std::move(ssts), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_sstable_reader, staging_sstable_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
s,
|
||||
_db.make_query_class_config().semaphore.make_permit(),
|
||||
std::move(ssts),
|
||||
query::full_partition_range,
|
||||
s->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
inject_failure("view_update_generator_consume_staging_sstable");
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as), db::no_timeout);
|
||||
auto result = staging_sstable_reader.consume_in_thread(view_updating_consumer(s, *t, sstables, _as, staging_sstable_reader_handle), db::no_timeout);
|
||||
if (result == stop_iteration::yes) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -27,6 +27,8 @@
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "database.hh"
|
||||
|
||||
class evictable_reader_handle;
|
||||
|
||||
namespace db::view {
|
||||
|
||||
/*
|
||||
@@ -34,22 +36,46 @@ namespace db::view {
|
||||
* It is expected to be run in seastar::async threaded context through consume_in_thread()
|
||||
*/
|
||||
class view_updating_consumer {
|
||||
schema_ptr _schema;
|
||||
lw_shared_ptr<table> _table;
|
||||
std::vector<sstables::shared_sstable> _excluded_sstables;
|
||||
const seastar::abort_source* _as;
|
||||
std::optional<mutation> _m;
|
||||
public:
|
||||
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as)
|
||||
// We prefer flushing on partition boundaries, so at the end of a partition,
|
||||
// we flush on reaching the soft limit. Otherwise we continue accumulating
|
||||
// data. We flush mid-partition if we reach the hard limit.
|
||||
static const size_t buffer_size_soft_limit;
|
||||
static const size_t buffer_size_hard_limit;
|
||||
|
||||
private:
|
||||
schema_ptr _schema;
|
||||
const seastar::abort_source* _as;
|
||||
evictable_reader_handle& _staging_reader_handle;
|
||||
circular_buffer<mutation> _buffer;
|
||||
mutation* _m{nullptr};
|
||||
size_t _buffer_size{0};
|
||||
noncopyable_function<future<row_locker::lock_holder>(mutation)> _view_update_pusher;
|
||||
|
||||
private:
|
||||
void do_flush_buffer();
|
||||
void maybe_flush_buffer_mid_partition();
|
||||
|
||||
public:
|
||||
// Push updates with a custom pusher. Mainly for tests.
|
||||
view_updating_consumer(schema_ptr schema, const seastar::abort_source& as, evictable_reader_handle& staging_reader_handle,
|
||||
noncopyable_function<future<row_locker::lock_holder>(mutation)> view_update_pusher)
|
||||
: _schema(std::move(schema))
|
||||
, _table(table.shared_from_this())
|
||||
, _excluded_sstables(std::move(excluded_sstables))
|
||||
, _as(&as)
|
||||
, _m()
|
||||
, _staging_reader_handle(staging_reader_handle)
|
||||
, _view_update_pusher(std::move(view_update_pusher))
|
||||
{ }
|
||||
|
||||
view_updating_consumer(schema_ptr schema, table& table, std::vector<sstables::shared_sstable> excluded_sstables, const seastar::abort_source& as,
|
||||
evictable_reader_handle& staging_reader_handle);
|
||||
|
||||
view_updating_consumer(view_updating_consumer&&) = default;
|
||||
|
||||
view_updating_consumer& operator=(view_updating_consumer&&) = delete;
|
||||
|
||||
void consume_new_partition(const dht::decorated_key& dk) {
|
||||
_m = mutation(_schema, dk, mutation_partition(_schema));
|
||||
_buffer.emplace_back(_schema, dk, mutation_partition(_schema));
|
||||
_m = &_buffer.back();
|
||||
}
|
||||
|
||||
void consume(tombstone t) {
|
||||
@@ -60,7 +86,9 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += sr.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(sr));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -68,7 +96,9 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += cr.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(cr));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -76,14 +106,27 @@ public:
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
_buffer_size += rt.memory_usage(*_schema);
|
||||
_m->partition().apply(*_schema, std::move(rt));
|
||||
maybe_flush_buffer_mid_partition();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
// Expected to be run in seastar::async threaded context (consume_in_thread())
|
||||
stop_iteration consume_end_of_partition();
|
||||
stop_iteration consume_end_of_partition() {
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
if (_buffer_size >= buffer_size_soft_limit) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
stop_iteration consume_end_of_stream() {
|
||||
if (!_buffer.empty()) {
|
||||
do_flush_buffer();
|
||||
}
|
||||
return stop_iteration(_as->abort_requested());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -659,6 +659,8 @@ flat_mutation_reader make_combined_reader(schema_ptr schema,
|
||||
return make_combined_reader(std::move(schema), std::move(v), fwd_sm, fwd_mr);
|
||||
}
|
||||
|
||||
const ssize_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
struct mutation_source_and_params {
|
||||
mutation_source _ms;
|
||||
@@ -685,8 +687,6 @@ class restricting_mutation_reader : public flat_mutation_reader::impl {
|
||||
};
|
||||
std::variant<pending_state, admitted_state> _state;
|
||||
|
||||
static const ssize_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
template<typename Function>
|
||||
requires std::is_move_constructible<Function>::value
|
||||
&& requires(Function fn, flat_mutation_reader& reader) {
|
||||
|
||||
@@ -304,6 +304,8 @@ public:
|
||||
mutation_source make_empty_mutation_source();
|
||||
snapshot_source make_empty_snapshot_source();
|
||||
|
||||
extern const ssize_t new_reader_base_cost;
|
||||
|
||||
// Creates a restricted reader whose resource usages will be tracked
|
||||
// during it's lifetime. If there are not enough resources (dues to
|
||||
// existing readers) to create the new reader, it's construction will
|
||||
|
||||
@@ -105,6 +105,7 @@ private:
|
||||
};
|
||||
|
||||
private:
|
||||
const resources _initial_resources;
|
||||
resources _resources;
|
||||
|
||||
expiring_fifo<entry, expiry_handler, db::timeout_clock> _wait_list;
|
||||
@@ -135,7 +136,8 @@ public:
|
||||
sstring name,
|
||||
size_t max_queue_length = std::numeric_limits<size_t>::max(),
|
||||
std::function<void()> prethrow_action = nullptr)
|
||||
: _resources(count, memory)
|
||||
: _initial_resources(count, memory)
|
||||
, _resources(count, memory)
|
||||
, _wait_list(expiry_handler(name))
|
||||
, _name(std::move(name))
|
||||
, _max_queue_length(max_queue_length)
|
||||
@@ -193,6 +195,10 @@ public:
|
||||
|
||||
reader_permit make_permit();
|
||||
|
||||
const resources initial_resources() const {
|
||||
return _initial_resources;
|
||||
}
|
||||
|
||||
const resources available_resources() const {
|
||||
return _resources;
|
||||
}
|
||||
|
||||
@@ -42,12 +42,20 @@ struct reader_resources {
|
||||
return count >= other.count && memory >= other.memory;
|
||||
}
|
||||
|
||||
reader_resources operator-(const reader_resources& other) const {
|
||||
return reader_resources{count - other.count, memory - other.memory};
|
||||
}
|
||||
|
||||
reader_resources& operator-=(const reader_resources& other) {
|
||||
count -= other.count;
|
||||
memory -= other.memory;
|
||||
return *this;
|
||||
}
|
||||
|
||||
reader_resources operator+(const reader_resources& other) const {
|
||||
return reader_resources{count + other.count, memory + other.memory};
|
||||
}
|
||||
|
||||
reader_resources& operator+=(const reader_resources& other) {
|
||||
count += other.count;
|
||||
memory += other.memory;
|
||||
|
||||
40
table.cc
40
table.cc
@@ -23,7 +23,6 @@
|
||||
#include "sstables/sstables.hh"
|
||||
#include "sstables/sstables_manager.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "db/view/view_updating_consumer.hh"
|
||||
#include "db/schema_tables.hh"
|
||||
#include "cell_locking.hh"
|
||||
#include "mutation_fragment.hh"
|
||||
@@ -327,6 +326,32 @@ flat_mutation_reader make_range_sstable_reader(schema_ptr s,
|
||||
fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader make_restricted_range_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
lw_shared_ptr<sstables::sstable_set> sstables,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr,
|
||||
sstables::read_monitor_generator& monitor_generator)
|
||||
{
|
||||
auto ms = mutation_source([sstables=std::move(sstables), &monitor_generator] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_range_sstable_reader(std::move(s), std::move(permit), std::move(sstables), pr, slice, pc,
|
||||
std::move(trace_state), fwd, fwd_mr, monitor_generator);
|
||||
});
|
||||
return make_restricted_flat_reader(std::move(ms), std::move(s), std::move(permit), pr, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader
|
||||
table::make_sstable_reader(schema_ptr s,
|
||||
reader_permit permit,
|
||||
@@ -2279,16 +2304,3 @@ table::as_mutation_source_excluding(std::vector<sstables::shared_sstable>& ssts)
|
||||
return this->make_reader_excluding_sstables(std::move(s), std::move(permit), ssts, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
stop_iteration db::view::view_updating_consumer::consume_end_of_partition() {
|
||||
if (_as->abort_requested()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
try {
|
||||
auto lock_holder = _table->stream_view_replica_updates(_schema, std::move(*_m), db::no_timeout, _excluded_sstables).get();
|
||||
} catch (...) {
|
||||
tlogger.warn("Failed to push replica updates for table {}.{}: {}", _schema->ks_name(), _schema->cf_name(), std::current_exception());
|
||||
}
|
||||
_m.reset();
|
||||
return stop_iteration::no;
|
||||
}
|
||||
|
||||
@@ -870,8 +870,6 @@ SEASTAR_TEST_CASE(reader_selector_fast_forwarding_test) {
|
||||
});
|
||||
}
|
||||
|
||||
static const std::size_t new_reader_base_cost{16 * 1024};
|
||||
|
||||
sstables::shared_sstable create_sstable(sstables::test_env& env, simple_schema& sschema, const sstring& path) {
|
||||
std::vector<mutation> mutations;
|
||||
mutations.reserve(1 << 14);
|
||||
|
||||
@@ -27,12 +27,16 @@
|
||||
#include "db/config.hh"
|
||||
|
||||
#include <seastar/testing/test_case.hh>
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/cql_assertions.hh"
|
||||
#include "test/lib/sstable_utils.hh"
|
||||
#include "schema_builder.hh"
|
||||
#include "service/priority_manager.hh"
|
||||
#include "test/lib/test_services.hh"
|
||||
#include "test/lib/data_model.hh"
|
||||
#include "test/lib/log.hh"
|
||||
#include "utils/ranges.hh"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
@@ -495,3 +499,257 @@ SEASTAR_TEST_CASE(test_view_update_generator) {
|
||||
BOOST_REQUIRE_EQUAL(view_update_generator.available_register_units(), db::view::view_update_generator::registration_queue_size);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_deadlock) {
|
||||
cql_test_config test_cfg;
|
||||
auto& db_cfg = *test_cfg.db_config;
|
||||
|
||||
db_cfg.enable_cache(false);
|
||||
db_cfg.enable_commitlog(false);
|
||||
|
||||
test_cfg.dbcfg.emplace();
|
||||
test_cfg.dbcfg->available_memory = memory::stats().total_memory();
|
||||
test_cfg.dbcfg->statement_scheduling_group = seastar::create_scheduling_group("statement", 1000).get0();
|
||||
test_cfg.dbcfg->streaming_scheduling_group = seastar::create_scheduling_group("streaming", 200).get0();
|
||||
|
||||
do_with_cql_env([] (cql_test_env& e) -> future<> {
|
||||
e.execute_cql("create table t (p text, c text, v text, primary key (p, c))").get();
|
||||
e.execute_cql("create materialized view tv as select * from t "
|
||||
"where p is not null and c is not null and v is not null "
|
||||
"primary key (v, c, p)").get();
|
||||
|
||||
auto msb = e.local_db().get_config().murmur3_partitioner_ignore_msb_bits();
|
||||
auto key1 = token_generation_for_shard(1, this_shard_id(), msb).front().first;
|
||||
|
||||
for (auto i = 0; i < 1024; ++i) {
|
||||
e.execute_cql(fmt::format("insert into t (p, c, v) values ('{}', 'c{}', 'x')", key1, i)).get();
|
||||
}
|
||||
|
||||
// We need data on the disk so that the pre-image reader is forced to go to disk.
|
||||
e.db().invoke_on_all([] (database& db) {
|
||||
return db.flush_all_memtables();
|
||||
}).get();
|
||||
|
||||
auto& view_update_generator = e.local_view_update_generator();
|
||||
auto s = test_table_schema();
|
||||
|
||||
lw_shared_ptr<table> t = e.local_db().find_column_family("ks", "t").shared_from_this();
|
||||
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes(key1)});
|
||||
mutation m(s, key);
|
||||
auto col = s->get_column_definition("v");
|
||||
const auto filler_val_size = 4 * 1024;
|
||||
const auto filler_val = sstring(filler_val_size, 'a');
|
||||
for (int i = 0; i < 1024; ++i) {
|
||||
auto& row = m.partition().clustered_row(*s, clustering_key::from_exploded(*s, {to_bytes(fmt::format("c{}", i))}));
|
||||
row.cells().apply(*col, atomic_cell::make_live(*col->type, 2345, col->type->decompose(filler_val)));
|
||||
}
|
||||
|
||||
auto sst = t->make_streaming_staging_sstable();
|
||||
sstables::sstable_writer_config sst_cfg = test_sstables_manager.configure_writer();
|
||||
auto& pc = service::get_local_streaming_priority();
|
||||
|
||||
sst->write_components(flat_mutation_reader_from_mutations({m}), 1ul, s, sst_cfg, {}, pc).get();
|
||||
sst->open_data().get();
|
||||
t->add_sstable_and_update_cache(sst).get();
|
||||
|
||||
auto& sem = *with_scheduling_group(e.local_db().get_streaming_scheduling_group(), [&] () {
|
||||
return &e.local_db().make_query_class_config().semaphore;
|
||||
}).get0();
|
||||
|
||||
// consume all units except what is needed to admit a single reader.
|
||||
sem.consume(sem.initial_resources() - reader_resources{1, new_reader_base_cost});
|
||||
|
||||
testlog.info("res = [.count={}, .memory={}]", sem.available_resources().count, sem.available_resources().memory);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(sem.get_inactive_read_stats().permit_based_evictions, 0);
|
||||
|
||||
view_update_generator.register_staging_sstable(sst, t).get();
|
||||
|
||||
eventually_true([&] {
|
||||
return sem.get_inactive_read_stats().permit_based_evictions > 0;
|
||||
});
|
||||
|
||||
return make_ready_future<>();
|
||||
}, std::move(test_cfg)).get();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering) {
|
||||
using partition_size_map = std::map<dht::decorated_key, size_t, dht::ring_position_less_comparator>;
|
||||
|
||||
class consumer_verifier {
|
||||
schema_ptr _schema;
|
||||
reader_permit _permit;
|
||||
const partition_size_map& _partition_rows;
|
||||
std::vector<mutation>& _collected_muts;
|
||||
bool& _failed;
|
||||
std::unique_ptr<row_locker> _rl;
|
||||
std::unique_ptr<row_locker::stats> _rl_stats;
|
||||
clustering_key::less_compare _less_cmp;
|
||||
const size_t _max_rows_soft;
|
||||
const size_t _max_rows_hard;
|
||||
size_t _buffer_rows = 0;
|
||||
|
||||
private:
|
||||
static size_t rows_in_limit(size_t l) {
|
||||
const size_t _100kb = 100 * 1024;
|
||||
// round up
|
||||
return l / _100kb + std::min(size_t(1), l % _100kb);
|
||||
}
|
||||
|
||||
static size_t rows_in_mut(const mutation& m) {
|
||||
return std::distance(m.partition().clustered_rows().begin(), m.partition().clustered_rows().end());
|
||||
}
|
||||
|
||||
void check(mutation mut) {
|
||||
// First we check that we would be able to create a reader, even
|
||||
// though the staging reader consumed all resources.
|
||||
auto fut = _permit.wait_admission(new_reader_base_cost, db::timeout_clock::now());
|
||||
BOOST_REQUIRE(!fut.failed());
|
||||
auto res_units = fut.get0();
|
||||
|
||||
const size_t current_rows = rows_in_mut(mut);
|
||||
const auto total_rows = _partition_rows.at(mut.decorated_key());
|
||||
_buffer_rows += current_rows;
|
||||
|
||||
testlog.trace("consumer_verifier::check(): key={}, rows={}/{}, _buffer={}",
|
||||
partition_key::with_schema_wrapper(*_schema, mut.key()),
|
||||
current_rows,
|
||||
total_rows,
|
||||
_buffer_rows);
|
||||
|
||||
BOOST_REQUIRE(current_rows);
|
||||
BOOST_REQUIRE(current_rows <= _max_rows_hard);
|
||||
BOOST_REQUIRE(_buffer_rows <= _max_rows_hard);
|
||||
|
||||
// The current partition doesn't have all of its rows yet, verify
|
||||
// that the new mutation contains the next rows for the same
|
||||
// partition
|
||||
if (!_collected_muts.empty() && rows_in_mut(_collected_muts.back()) < _partition_rows.at(_collected_muts.back().decorated_key())) {
|
||||
BOOST_REQUIRE(_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
const auto& previous_ckey = (--_collected_muts.back().partition().clustered_rows().end())->key();
|
||||
const auto& next_ckey = mut.partition().clustered_rows().begin()->key();
|
||||
BOOST_REQUIRE(_less_cmp(previous_ckey, next_ckey));
|
||||
mutation_application_stats stats;
|
||||
_collected_muts.back().partition().apply(*_schema, mut.partition(), *mut.schema(), stats);
|
||||
// The new mutation is a new partition.
|
||||
} else {
|
||||
if (!_collected_muts.empty()) {
|
||||
BOOST_REQUIRE(!_collected_muts.back().decorated_key().equal(*mut.schema(), mut.decorated_key()));
|
||||
}
|
||||
_collected_muts.push_back(std::move(mut));
|
||||
}
|
||||
|
||||
if (_buffer_rows >= _max_rows_hard) { // buffer flushed on hard limit
|
||||
_buffer_rows = 0;
|
||||
testlog.trace("consumer_verifier::check(): buffer ends on hard limit");
|
||||
} else if (_buffer_rows >= _max_rows_soft) { // buffer flushed on soft limit
|
||||
_buffer_rows = 0;
|
||||
testlog.trace("consumer_verifier::check(): buffer ends on soft limit");
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
consumer_verifier(schema_ptr schema, reader_permit permit, const partition_size_map& partition_rows, std::vector<mutation>& collected_muts, bool& failed)
|
||||
: _schema(std::move(schema))
|
||||
, _permit(std::move(permit))
|
||||
, _partition_rows(partition_rows)
|
||||
, _collected_muts(collected_muts)
|
||||
, _failed(failed)
|
||||
, _rl(std::make_unique<row_locker>(_schema))
|
||||
, _rl_stats(std::make_unique<row_locker::stats>())
|
||||
, _less_cmp(*_schema)
|
||||
, _max_rows_soft(rows_in_limit(db::view::view_updating_consumer::buffer_size_soft_limit))
|
||||
, _max_rows_hard(rows_in_limit(db::view::view_updating_consumer::buffer_size_hard_limit))
|
||||
{ }
|
||||
|
||||
future<row_locker::lock_holder> operator()(mutation mut) {
|
||||
try {
|
||||
check(std::move(mut));
|
||||
} catch (...) {
|
||||
testlog.error("consumer_verifier::operator(): caught unexpected exception {}", std::current_exception());
|
||||
_failed |= true;
|
||||
}
|
||||
return _rl->lock_pk(_collected_muts.back().decorated_key(), true, db::no_timeout, *_rl_stats);
|
||||
}
|
||||
};
|
||||
|
||||
reader_concurrency_semaphore sem(1, new_reader_base_cost, get_name());
|
||||
|
||||
auto schema = schema_builder("ks", "cf")
|
||||
.with_column("pk", int32_type, column_kind::partition_key)
|
||||
.with_column("ck", int32_type, column_kind::clustering_key)
|
||||
.with_column("v", bytes_type)
|
||||
.build();
|
||||
|
||||
const auto blob_100kb = bytes(100 * 1024, bytes::value_type(0xab));
|
||||
const abort_source as;
|
||||
|
||||
const auto partition_size_sets = std::vector<std::vector<int>>{{12}, {8, 4}, {8, 16}, {22}, {8, 8, 8, 8}, {8, 8, 8, 16, 8}, {8, 20, 16, 16}, {50}, {21}, {21, 2}};
|
||||
const auto max_partition_set_size = std::ranges::max_element(partition_size_sets, [] (const std::vector<int>& a, const std::vector<int>& b) { return a.size() < b.size(); })->size();
|
||||
auto pkeys = ranges::to<std::vector<dht::decorated_key>>(std::views::iota(size_t{0}, max_partition_set_size) | std::views::transform([schema] (int i) {
|
||||
return dht::decorate_key(*schema, partition_key::from_single_value(*schema, int32_type->decompose(data_value(i))));
|
||||
}));
|
||||
std::ranges::sort(pkeys, dht::ring_position_less_comparator(*schema));
|
||||
|
||||
for (auto partition_sizes_100kb : partition_size_sets) {
|
||||
testlog.debug("partition_sizes_100kb={}", partition_sizes_100kb);
|
||||
partition_size_map partition_rows{dht::ring_position_less_comparator(*schema)};
|
||||
std::vector<mutation> muts;
|
||||
auto pk = 0;
|
||||
for (auto partition_size_100kb : partition_sizes_100kb) {
|
||||
auto mut_desc = tests::data_model::mutation_description(pkeys.at(pk++).key().explode(*schema));
|
||||
for (auto ck = 0; ck < partition_size_100kb; ++ck) {
|
||||
mut_desc.add_clustered_cell({int32_type->decompose(data_value(ck))}, "v", tests::data_model::mutation_description::value(blob_100kb));
|
||||
}
|
||||
muts.push_back(mut_desc.build(schema));
|
||||
partition_rows.emplace(muts.back().decorated_key(), partition_size_100kb);
|
||||
}
|
||||
|
||||
std::ranges::sort(muts, [less = dht::ring_position_less_comparator(*schema)] (const mutation& a, const mutation& b) {
|
||||
return less(a.decorated_key(), b.decorated_key());
|
||||
});
|
||||
|
||||
auto permit = sem.make_permit();
|
||||
|
||||
auto mt = make_lw_shared<memtable>(schema);
|
||||
for (const auto& mut : muts) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
|
||||
auto ms = mutation_source([mt] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding fwd_ms,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_restricted_flat_reader(mt->as_data_source(), s, std::move(permit), pr, ps, pc, std::move(ts), fwd_ms, fwd_mr);
|
||||
});
|
||||
auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
schema,
|
||||
permit,
|
||||
query::full_partition_range,
|
||||
schema->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
nullptr,
|
||||
::mutation_reader::forwarding::no);
|
||||
|
||||
std::vector<mutation> collected_muts;
|
||||
bool failed = false;
|
||||
|
||||
staging_reader.consume_in_thread(db::view::view_updating_consumer(schema, as, staging_reader_handle,
|
||||
consumer_verifier(schema, permit, partition_rows, collected_muts, failed)), db::no_timeout);
|
||||
|
||||
BOOST_REQUIRE(!failed);
|
||||
|
||||
BOOST_REQUIRE_EQUAL(muts.size(), collected_muts.size());
|
||||
for (size_t i = 0; i < muts.size(); ++i) {
|
||||
testlog.trace("compare mutation {}", i);
|
||||
BOOST_REQUIRE_EQUAL(muts[i], collected_muts[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -459,7 +459,11 @@ public:
|
||||
auto stop_storage_service = defer([&ss] { ss.stop().get(); });
|
||||
|
||||
database_config dbcfg;
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
if (cfg_in.dbcfg) {
|
||||
dbcfg = std::move(*cfg_in.dbcfg);
|
||||
} else {
|
||||
dbcfg.available_memory = memory::stats().total_memory();
|
||||
}
|
||||
db.start(std::ref(*cfg), dbcfg, std::ref(mm_notif), std::ref(feature_service), std::ref(token_metadata), std::ref(abort_sources)).get();
|
||||
auto stop_db = defer([&db] {
|
||||
db.stop().get();
|
||||
|
||||
@@ -64,6 +64,7 @@ namespace db {
|
||||
class cql_test_config {
|
||||
public:
|
||||
seastar::shared_ptr<db::config> db_config;
|
||||
std::optional<database_config> dbcfg;
|
||||
std::set<sstring> disabled_features;
|
||||
|
||||
cql_test_config();
|
||||
|
||||
Reference in New Issue
Block a user