Try to read whole streamed_mutation up to limit

If limit is exceeded then return the streamed_mutation
and don't cache it.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2016-07-21 09:35:35 +02:00
parent 0d39bb1ad0
commit 98c12dc2e2
Notes: Pekka Enberg 2016-07-27 14:09:29 +03:00
backport: 1.3

View File

@@ -38,6 +38,22 @@ static logging::logger logger("cache");
thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduling_group(1ms, 0.2);
enum class is_wide_partition { yes, no };
future<is_wide_partition, mutation_opt> try_to_read(streamed_mutation_opt&& sm) {
if (!sm) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, mutation_opt());
}
static const size_t max_size_of_cached_partition = 10 * 1024 * 1024;
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_size_of_cached_partition).then(
[] (mutation_opt&& omo) mutable {
if (omo) {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, std::move(omo));
} else {
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::yes, mutation_opt());
}
});
}
cache_tracker& global_cache_tracker() {
static thread_local cache_tracker instance;
@@ -196,29 +212,48 @@ const logalloc::region& cache_tracker::region() const {
class single_partition_populating_reader final : public mutation_reader::impl {
schema_ptr _schema;
row_cache& _cache;
mutation_source& _underlying;
mutation_reader _delegate;
const io_priority_class _pc;
query::clustering_key_filtering_context _ck_filtering;
public:
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate, query::clustering_key_filtering_context ck_filtering)
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_source& underlying,
mutation_reader delegate, const io_priority_class pc, query::clustering_key_filtering_context ck_filtering)
: _schema(std::move(s))
, _cache(cache)
, _underlying(underlying)
, _delegate(std::move(delegate))
, _pc(pc)
, _ck_filtering(ck_filtering)
{ }
virtual future<streamed_mutation_opt> operator()() override {
return _delegate().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return streamed_mutation_from_mutation(std::move(*mo));
auto op = _cache._populate_phaser.start();
return _delegate().then([this, op = std::move(op)] (auto sm) mutable {
if (!sm) {
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
}
return { };
dht::decorated_key dk = sm->decorated_key();
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) {
if (wide_partition == is_wide_partition::no) {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
}
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
} else {
auto reader = _underlying(_schema,
query::partition_range::make_singular(dht::ring_position(std::move(dk))),
_ck_filtering,
_pc);
return reader();
}
});
});
}
};
@@ -397,22 +432,36 @@ public:
{}
virtual future<streamed_mutation_opt> operator()() override {
update_reader();
return _reader().then([] (auto sm) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
_last_key = dht::ring_position(mo->decorated_key());
_last_key_populate_phase = _cache._populate_phaser.phase();
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return streamed_mutation_from_mutation(std::move(*mo));
}
maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
return {};
auto op = _cache._populate_phaser.start();
return _reader().then([this, op = std::move(op)] (auto sm) mutable {
stdx::optional<dht::decorated_key> dk = (sm) ? stdx::optional<dht::decorated_key>(sm->decorated_key())
: stdx::optional<dht::decorated_key>(stdx::nullopt);
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
(is_wide_partition wide_partition, mutation_opt&& mo) {
if (wide_partition == is_wide_partition::no) {
if (mo) {
_cache.populate(*mo);
mo->upgrade(_schema);
this->maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
_last_key = dht::ring_position(mo->decorated_key());
_last_key_populate_phase = _cache._populate_phaser.phase();
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
mo->partition() = std::move(filtered_partition);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
}
this->maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
} else {
assert(bool(dk));
_last_key = std::experimental::optional<dht::ring_position>();
auto reader = _underlying(_schema,
query::partition_range::make_singular(dht::ring_position(std::move(*dk))),
_ck_filtering,
_pc);
return reader();
}
});
});
}
};
@@ -678,8 +727,8 @@ row_cache::make_reader(schema_ptr s,
return make_reader_returning(e.read(*this, s, ck_filtering));
} else {
on_miss();
return make_mutation_reader<single_partition_populating_reader>(s, *this,
_underlying(_schema, range, query::no_clustering_key_filtering, pc),
return make_mutation_reader<single_partition_populating_reader>(s, *this, _underlying,
_underlying(_schema, range, query::no_clustering_key_filtering, pc), pc,
ck_filtering);
}
});