diff --git a/database.cc b/database.cc index 5f265cec49..bb2f025fdf 100644 --- a/database.cc +++ b/database.cc @@ -2355,6 +2355,9 @@ database::setup_metrics() { sm::description("Counts sstables that survived the clustering key filtering. " "High value indicates that bloom filter is not very efficient and still have to access a lot of sstables to get data.")), + sm::make_derive("dropped_view_updates", _cf_stats.dropped_view_updates, + sm::description("Counts the number of view updates that have been dropped due to cluster overload. ")), + sm::make_derive("total_writes", _stats->total_writes, sm::description("Counts the total number of successful write operations performed by this shard.")), @@ -4512,10 +4515,8 @@ future<> table::generate_and_propagate_view_updates(const schema_ptr& base, std::move(views), flat_mutation_reader_from_mutations({std::move(m)}), std::move(existings)).then([this, timeout, base_token = std::move(base_token)] (std::vector&& updates) mutable { - return seastar::get_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates), timeout).then( - [this, base_token = std::move(base_token), updates = std::move(updates)] (db::timeout_semaphore_units units) mutable { - db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, std::move(units)).handle_exception([] (auto ignored) { }); - }); + auto units = seastar::consume_units(*_config.view_update_concurrency_semaphore, memory_usage_of(updates)); + db::view::mutate_MV(std::move(base_token), std::move(updates), _view_stats, std::move(units)).handle_exception([] (auto ignored) { }); }); } diff --git a/database.hh b/database.hh index 7e25ffb921..9a05ccea47 100644 --- a/database.hh +++ b/database.hh @@ -280,6 +280,9 @@ struct cf_stats { int64_t clustering_filter_fast_path_count = 0; // how many sstables survived the clustering key checks int64_t surviving_sstables_after_clustering_filter = 0; + + // How many view updates were dropped due to overload. + int64_t dropped_view_updates = 0; }; class table; diff --git a/table.cc b/table.cc index c79ac6046c..b027c21466 100644 --- a/table.cc +++ b/table.cc @@ -71,6 +71,15 @@ future table::push_view_replica_updates(const schema_pt } future table::do_push_view_replica_updates(const schema_ptr& s, mutation&& m, db::timeout_clock::time_point timeout, mutation_source&& source) const { + if (!_config.view_update_concurrency_semaphore->current()) { + // We don't have resources to generate view updates for this write. If we reached this point, we failed to + // throttle the client. The memory queue is already full, waiting on the semaphore would cause this node to + // run out of memory, and generating hints would ultimately result in the disk queue being full too. We don't + // drop the base write, which could create inconsistencies between base replicas. So we dolefully continue, + // and note the fact we dropped a view update. + ++_config.cf_stats->dropped_view_updates; + return make_ready_future(); + } auto& base = schema(); m.upgrade(base); auto views = affected_views(base, m);