diff --git a/row_cache.cc b/row_cache.cc index 40b10c57a2..382457e8ec 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -38,6 +38,22 @@ static logging::logger logger("cache"); thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduling_group(1ms, 0.2); +enum class is_wide_partition { yes, no }; + +future try_to_read(streamed_mutation_opt&& sm) { + if (!sm) { + return make_ready_future(is_wide_partition::no, mutation_opt()); + } + static const size_t max_size_of_cached_partition = 10 * 1024 * 1024; + return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_size_of_cached_partition).then( + [] (mutation_opt&& omo) mutable { + if (omo) { + return make_ready_future(is_wide_partition::no, std::move(omo)); + } else { + return make_ready_future(is_wide_partition::yes, mutation_opt()); + } + }); +} cache_tracker& global_cache_tracker() { static thread_local cache_tracker instance; @@ -196,29 +212,48 @@ const logalloc::region& cache_tracker::region() const { class single_partition_populating_reader final : public mutation_reader::impl { schema_ptr _schema; row_cache& _cache; + mutation_source& _underlying; mutation_reader _delegate; + const io_priority_class _pc; query::clustering_key_filtering_context _ck_filtering; public: - single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate, query::clustering_key_filtering_context ck_filtering) + single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_source& underlying, + mutation_reader delegate, const io_priority_class pc, query::clustering_key_filtering_context ck_filtering) : _schema(std::move(s)) , _cache(cache) + , _underlying(underlying) , _delegate(std::move(delegate)) + , _pc(pc) , _ck_filtering(ck_filtering) { } virtual future operator()() override { - return _delegate().then([] (auto sm) { - return mutation_from_streamed_mutation(std::move(sm)); - }).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt { - if (mo) { - _cache.populate(*mo); - mo->upgrade(_schema); - auto& ck_ranges = _ck_filtering.get_ranges(mo->key()); - auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges); - mo->partition() = std::move(filtered_partition); - return streamed_mutation_from_mutation(std::move(*mo)); + auto op = _cache._populate_phaser.start(); + return _delegate().then([this, op = std::move(op)] (auto sm) mutable { + if (!sm) { + return make_ready_future(streamed_mutation_opt()); } - return { }; + dht::decorated_key dk = sm->decorated_key(); + return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)] + (is_wide_partition wide_partition, mutation_opt&& mo) { + if (wide_partition == is_wide_partition::no) { + if (mo) { + _cache.populate(*mo); + mo->upgrade(_schema); + auto& ck_ranges = _ck_filtering.get_ranges(mo->key()); + auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges); + mo->partition() = std::move(filtered_partition); + return make_ready_future(streamed_mutation_from_mutation(std::move(*mo))); + } + return make_ready_future(streamed_mutation_opt()); + } else { + auto reader = _underlying(_schema, + query::partition_range::make_singular(dht::ring_position(std::move(dk))), + _ck_filtering, + _pc); + return reader(); + } + }); }); } }; @@ -397,22 +432,36 @@ public: {} virtual future operator()() override { update_reader(); - return _reader().then([] (auto sm) { - return mutation_from_streamed_mutation(std::move(sm)); - }).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt { - if (mo) { - _cache.populate(*mo); - mo->upgrade(_schema); - maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true)); - _last_key = dht::ring_position(mo->decorated_key()); - _last_key_populate_phase = _cache._populate_phaser.phase(); - auto& ck_ranges = _ck_filtering.get_ranges(mo->key()); - auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges); - mo->partition() = std::move(filtered_partition); - return streamed_mutation_from_mutation(std::move(*mo)); - } - maybe_mark_last_entry_as_continuous(_make_last_entry_continuous); - return {}; + auto op = _cache._populate_phaser.start(); + return _reader().then([this, op = std::move(op)] (auto sm) mutable { + stdx::optional dk = (sm) ? stdx::optional(sm->decorated_key()) + : stdx::optional(stdx::nullopt); + return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)] + (is_wide_partition wide_partition, mutation_opt&& mo) { + if (wide_partition == is_wide_partition::no) { + if (mo) { + _cache.populate(*mo); + mo->upgrade(_schema); + this->maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true)); + _last_key = dht::ring_position(mo->decorated_key()); + _last_key_populate_phase = _cache._populate_phaser.phase(); + auto& ck_ranges = _ck_filtering.get_ranges(mo->key()); + auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges); + mo->partition() = std::move(filtered_partition); + return make_ready_future(streamed_mutation_from_mutation(std::move(*mo))); + } + this->maybe_mark_last_entry_as_continuous(_make_last_entry_continuous); + return make_ready_future(streamed_mutation_opt()); + } else { + assert(bool(dk)); + _last_key = std::experimental::optional(); + auto reader = _underlying(_schema, + query::partition_range::make_singular(dht::ring_position(std::move(*dk))), + _ck_filtering, + _pc); + return reader(); + } + }); }); } }; @@ -678,8 +727,8 @@ row_cache::make_reader(schema_ptr s, return make_reader_returning(e.read(*this, s, ck_filtering)); } else { on_miss(); - return make_mutation_reader(s, *this, - _underlying(_schema, range, query::no_clustering_key_filtering, pc), + return make_mutation_reader(s, *this, _underlying, + _underlying(_schema, range, query::no_clustering_key_filtering, pc), pc, ck_filtering); } });