From df1d438fcd9d574062cfcb4ec7ff3ccfd99fc540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Dziepak?= Date: Tue, 11 Dec 2018 08:12:54 +0000 Subject: [PATCH] row_cache: drop support for streamed_mutation::forwarding::yes entirely --- read_context.hh | 4 ---- row_cache.cc | 50 +++++++++---------------------------------------- 2 files changed, 9 insertions(+), 45 deletions(-) diff --git a/read_context.hh b/read_context.hh index e13c0a5dd8..0ba29eef64 100644 --- a/read_context.hh +++ b/read_context.hh @@ -125,7 +125,6 @@ class read_context final : public enable_lw_shared_from_this { const query::partition_slice& _slice; const io_priority_class& _pc; tracing::trace_state_ptr _trace_state; - streamed_mutation::forwarding _fwd; mutation_reader::forwarding _fwd_mr; bool _range_query; // When reader enters a partition, it must be set up for reading that @@ -150,7 +149,6 @@ public: const query::partition_slice& slice, const io_priority_class& pc, tracing::trace_state_ptr trace_state, - streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) : _cache(cache) , _schema(std::move(schema)) @@ -158,7 +156,6 @@ public: , _slice(slice) , _pc(pc) , _trace_state(std::move(trace_state)) - , _fwd(fwd) , _fwd_mr(fwd_mr) , _range_query(!range.is_singular() || !range.start()->value().has_key()) , _underlying(_cache, *this) @@ -184,7 +181,6 @@ public: const query::partition_slice& slice() const { return _slice; } const io_priority_class& pc() const { return _pc; } tracing::trace_state_ptr trace_state() const { return _trace_state; } - streamed_mutation::forwarding fwd() const { return _fwd; } mutation_reader::forwarding fwd_mr() const { return _fwd_mr; } bool is_range_query() const { return _range_query; } autoupdating_underlying_reader& underlying() { return _underlying; } diff --git a/row_cache.cc b/row_cache.cc index 7943013d63..65b662376b 100644 --- a/row_cache.cc +++ b/row_cache.cc @@ -353,10 +353,7 @@ static flat_mutation_reader read_directly_from_underlying(read_context& reader) if (reader.schema()->version() != reader.underlying().underlying().schema()->version()) { res = transform(std::move(res), schema_upgrader(reader.schema())); } - if (reader.fwd() == streamed_mutation::forwarding::no) { - res = make_nonforwardable(std::move(res), true); - } - return std::move(res); + return make_nonforwardable(std::move(res), true); } // Reader which populates the cache using data from the delegate. @@ -442,14 +439,7 @@ public: return make_ready_future<>(); } virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override { - if (!_reader) { - _end_of_stream = true; - return make_ready_future<>(); - } - assert(bool(_read_context->fwd())); - _end_of_stream = false; - forward_buffer_to(pr.start()); - return _reader->fast_forward_to(std::move(pr), timeout); + throw std::bad_function_call(); } virtual size_t buffer_size() const override { if (_reader) { @@ -680,11 +670,7 @@ private: }); } void on_end_of_stream() { - if (_read_context->fwd() == streamed_mutation::forwarding::yes) { - _end_of_stream = true; - } else { - _reader = {}; - } + _reader = {}; } public: scanning_and_populating_reader(row_cache& cache, @@ -712,17 +698,9 @@ public: }); } virtual void next_partition() override { - if (_read_context->fwd() == streamed_mutation::forwarding::yes) { - if (_reader) { - clear_buffer(); - _reader->next_partition(); - _end_of_stream = false; - } - } else { - clear_buffer_to_next_partition(); - if (_reader && is_buffer_empty()) { - _reader->next_partition(); - } + clear_buffer_to_next_partition(); + if (_reader && is_buffer_empty()) { + _reader->next_partition(); } } virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override { @@ -737,14 +715,7 @@ public: return make_ready_future<>(); } virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override { - forward_buffer_to(cr.start()); - if (_reader) { - _end_of_stream = false; - return _reader->fast_forward_to(std::move(cr), timeout); - } else { - _end_of_stream = true; - return make_ready_future<>(); - } + throw std::bad_function_call(); } virtual size_t buffer_size() const override { if (_reader) { @@ -768,7 +739,7 @@ row_cache::make_reader(schema_ptr s, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) { - auto ctx = make_lw_shared(*this, s, range, slice, pc, trace_state, streamed_mutation::forwarding::no, fwd_mr); + auto ctx = make_lw_shared(*this, s, range, slice, pc, trace_state, fwd_mr); if (!ctx->is_range_query() && !fwd_mr) { auto mr = _read_section(_tracker.region(), [&] { @@ -1258,10 +1229,7 @@ flat_mutation_reader cache_entry::do_read(row_cache& rc, read_context& reader) { if (reader.schema()->version() != _schema->version()) { r = transform(std::move(r), schema_upgrader(reader.schema())); } - if (reader.fwd() == streamed_mutation::forwarding::yes) { - r = make_forwardable(std::move(r)); - } - return std::move(r); + return r; } const schema_ptr& row_cache::schema() const {