row_cache: drop support for streamed_mutation::forwarding::yes entirely

This commit is contained in:
Paweł Dziepak
2018-12-11 08:12:54 +00:00
parent adcb3ec20c
commit df1d438fcd
2 changed files with 9 additions and 45 deletions

View File

@@ -125,7 +125,6 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
const query::partition_slice& _slice;
const io_priority_class& _pc;
tracing::trace_state_ptr _trace_state;
streamed_mutation::forwarding _fwd;
mutation_reader::forwarding _fwd_mr;
bool _range_query;
// When reader enters a partition, it must be set up for reading that
@@ -150,7 +149,6 @@ public:
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
: _cache(cache)
, _schema(std::move(schema))
@@ -158,7 +156,6 @@ public:
, _slice(slice)
, _pc(pc)
, _trace_state(std::move(trace_state))
, _fwd(fwd)
, _fwd_mr(fwd_mr)
, _range_query(!range.is_singular() || !range.start()->value().has_key())
, _underlying(_cache, *this)
@@ -184,7 +181,6 @@ public:
const query::partition_slice& slice() const { return _slice; }
const io_priority_class& pc() const { return _pc; }
tracing::trace_state_ptr trace_state() const { return _trace_state; }
streamed_mutation::forwarding fwd() const { return _fwd; }
mutation_reader::forwarding fwd_mr() const { return _fwd_mr; }
bool is_range_query() const { return _range_query; }
autoupdating_underlying_reader& underlying() { return _underlying; }

View File

@@ -353,10 +353,7 @@ static flat_mutation_reader read_directly_from_underlying(read_context& reader)
if (reader.schema()->version() != reader.underlying().underlying().schema()->version()) {
res = transform(std::move(res), schema_upgrader(reader.schema()));
}
if (reader.fwd() == streamed_mutation::forwarding::no) {
res = make_nonforwardable(std::move(res), true);
}
return std::move(res);
return make_nonforwardable(std::move(res), true);
}
// Reader which populates the cache using data from the delegate.
@@ -442,14 +439,7 @@ public:
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
if (!_reader) {
_end_of_stream = true;
return make_ready_future<>();
}
assert(bool(_read_context->fwd()));
_end_of_stream = false;
forward_buffer_to(pr.start());
return _reader->fast_forward_to(std::move(pr), timeout);
throw std::bad_function_call();
}
virtual size_t buffer_size() const override {
if (_reader) {
@@ -680,11 +670,7 @@ private:
});
}
void on_end_of_stream() {
if (_read_context->fwd() == streamed_mutation::forwarding::yes) {
_end_of_stream = true;
} else {
_reader = {};
}
_reader = {};
}
public:
scanning_and_populating_reader(row_cache& cache,
@@ -712,17 +698,9 @@ public:
});
}
virtual void next_partition() override {
if (_read_context->fwd() == streamed_mutation::forwarding::yes) {
if (_reader) {
clear_buffer();
_reader->next_partition();
_end_of_stream = false;
}
} else {
clear_buffer_to_next_partition();
if (_reader && is_buffer_empty()) {
_reader->next_partition();
}
clear_buffer_to_next_partition();
if (_reader && is_buffer_empty()) {
_reader->next_partition();
}
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
@@ -737,14 +715,7 @@ public:
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range cr, db::timeout_clock::time_point timeout) override {
forward_buffer_to(cr.start());
if (_reader) {
_end_of_stream = false;
return _reader->fast_forward_to(std::move(cr), timeout);
} else {
_end_of_stream = true;
return make_ready_future<>();
}
throw std::bad_function_call();
}
virtual size_t buffer_size() const override {
if (_reader) {
@@ -768,7 +739,7 @@ row_cache::make_reader(schema_ptr s,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto ctx = make_lw_shared<read_context>(*this, s, range, slice, pc, trace_state, streamed_mutation::forwarding::no, fwd_mr);
auto ctx = make_lw_shared<read_context>(*this, s, range, slice, pc, trace_state, fwd_mr);
if (!ctx->is_range_query() && !fwd_mr) {
auto mr = _read_section(_tracker.region(), [&] {
@@ -1258,10 +1229,7 @@ flat_mutation_reader cache_entry::do_read(row_cache& rc, read_context& reader) {
if (reader.schema()->version() != _schema->version()) {
r = transform(std::move(r), schema_upgrader(reader.schema()));
}
if (reader.fwd() == streamed_mutation::forwarding::yes) {
r = make_forwardable(std::move(r));
}
return std::move(r);
return r;
}
const schema_ptr& row_cache::schema() const {