Try to read whole streamed_mutation up to limit
If limit is exceeded then return the streamed_mutation and don't cache it. Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Notes:
Pekka Enberg
2016-07-27 14:09:29 +03:00
backport: 1.3
109
row_cache.cc
109
row_cache.cc
@@ -38,6 +38,22 @@ static logging::logger logger("cache");
|
||||
|
||||
thread_local seastar::thread_scheduling_group row_cache::_update_thread_scheduling_group(1ms, 0.2);
|
||||
|
||||
enum class is_wide_partition { yes, no };
|
||||
|
||||
future<is_wide_partition, mutation_opt> try_to_read(streamed_mutation_opt&& sm) {
|
||||
if (!sm) {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, mutation_opt());
|
||||
}
|
||||
static const size_t max_size_of_cached_partition = 10 * 1024 * 1024;
|
||||
return mutation_from_streamed_mutation_with_limit(std::move(*sm), max_size_of_cached_partition).then(
|
||||
[] (mutation_opt&& omo) mutable {
|
||||
if (omo) {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::no, std::move(omo));
|
||||
} else {
|
||||
return make_ready_future<is_wide_partition, mutation_opt>(is_wide_partition::yes, mutation_opt());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
cache_tracker& global_cache_tracker() {
|
||||
static thread_local cache_tracker instance;
|
||||
@@ -196,29 +212,48 @@ const logalloc::region& cache_tracker::region() const {
|
||||
class single_partition_populating_reader final : public mutation_reader::impl {
|
||||
schema_ptr _schema;
|
||||
row_cache& _cache;
|
||||
mutation_source& _underlying;
|
||||
mutation_reader _delegate;
|
||||
const io_priority_class _pc;
|
||||
query::clustering_key_filtering_context _ck_filtering;
|
||||
public:
|
||||
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_reader delegate, query::clustering_key_filtering_context ck_filtering)
|
||||
single_partition_populating_reader(schema_ptr s, row_cache& cache, mutation_source& underlying,
|
||||
mutation_reader delegate, const io_priority_class pc, query::clustering_key_filtering_context ck_filtering)
|
||||
: _schema(std::move(s))
|
||||
, _cache(cache)
|
||||
, _underlying(underlying)
|
||||
, _delegate(std::move(delegate))
|
||||
, _pc(pc)
|
||||
, _ck_filtering(ck_filtering)
|
||||
{ }
|
||||
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _delegate().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return streamed_mutation_from_mutation(std::move(*mo));
|
||||
auto op = _cache._populate_phaser.start();
|
||||
return _delegate().then([this, op = std::move(op)] (auto sm) mutable {
|
||||
if (!sm) {
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
}
|
||||
return { };
|
||||
dht::decorated_key dk = sm->decorated_key();
|
||||
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
|
||||
(is_wide_partition wide_partition, mutation_opt&& mo) {
|
||||
if (wide_partition == is_wide_partition::no) {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
|
||||
}
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
} else {
|
||||
auto reader = _underlying(_schema,
|
||||
query::partition_range::make_singular(dht::ring_position(std::move(dk))),
|
||||
_ck_filtering,
|
||||
_pc);
|
||||
return reader();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -397,22 +432,36 @@ public:
|
||||
{}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
update_reader();
|
||||
return _reader().then([] (auto sm) {
|
||||
return mutation_from_streamed_mutation(std::move(sm));
|
||||
}).then([this, op = _cache._populate_phaser.start()] (mutation_opt&& mo) -> streamed_mutation_opt {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
|
||||
_last_key = dht::ring_position(mo->decorated_key());
|
||||
_last_key_populate_phase = _cache._populate_phaser.phase();
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return streamed_mutation_from_mutation(std::move(*mo));
|
||||
}
|
||||
maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
|
||||
return {};
|
||||
auto op = _cache._populate_phaser.start();
|
||||
return _reader().then([this, op = std::move(op)] (auto sm) mutable {
|
||||
stdx::optional<dht::decorated_key> dk = (sm) ? stdx::optional<dht::decorated_key>(sm->decorated_key())
|
||||
: stdx::optional<dht::decorated_key>(stdx::nullopt);
|
||||
return try_to_read(std::move(sm)).then([this, op = std::move(op), dk = std::move(dk)]
|
||||
(is_wide_partition wide_partition, mutation_opt&& mo) {
|
||||
if (wide_partition == is_wide_partition::no) {
|
||||
if (mo) {
|
||||
_cache.populate(*mo);
|
||||
mo->upgrade(_schema);
|
||||
this->maybe_mark_last_entry_as_continuous(mark_end_as_continuous(mark_end_as_continuous::override(), true));
|
||||
_last_key = dht::ring_position(mo->decorated_key());
|
||||
_last_key_populate_phase = _cache._populate_phaser.phase();
|
||||
auto& ck_ranges = _ck_filtering.get_ranges(mo->key());
|
||||
auto filtered_partition = mutation_partition(std::move(mo->partition()), *(mo->schema()), ck_ranges);
|
||||
mo->partition() = std::move(filtered_partition);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_from_mutation(std::move(*mo)));
|
||||
}
|
||||
this->maybe_mark_last_entry_as_continuous(_make_last_entry_continuous);
|
||||
return make_ready_future<streamed_mutation_opt>(streamed_mutation_opt());
|
||||
} else {
|
||||
assert(bool(dk));
|
||||
_last_key = std::experimental::optional<dht::ring_position>();
|
||||
auto reader = _underlying(_schema,
|
||||
query::partition_range::make_singular(dht::ring_position(std::move(*dk))),
|
||||
_ck_filtering,
|
||||
_pc);
|
||||
return reader();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -678,8 +727,8 @@ row_cache::make_reader(schema_ptr s,
|
||||
return make_reader_returning(e.read(*this, s, ck_filtering));
|
||||
} else {
|
||||
on_miss();
|
||||
return make_mutation_reader<single_partition_populating_reader>(s, *this,
|
||||
_underlying(_schema, range, query::no_clustering_key_filtering, pc),
|
||||
return make_mutation_reader<single_partition_populating_reader>(s, *this, _underlying,
|
||||
_underlying(_schema, range, query::no_clustering_key_filtering, pc), pc,
|
||||
ck_filtering);
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user