Allow reading exactly desired byte ranges and fast_forward_to
In commit c63e88d556, support was added for
fast_forward_to() in data_consume_rows(). Because an input stream's end
cannot be changed after creation, that patch ignores the specified end
byte, and uses the end of file as the end position of the stream.
As result of this, even when we want to read a specific byte range (e.g.,
in the repair code to checksum the partitions in a given range), the code
reads an entire 128K buffer around the end byte, or significantly more, with
read-ahead enabled. This causes repair to do more than 10 times the amount
of I/O it really has to do in the checksumming phase (which in the current
implementation, reads small ranges of partitions at a time).
This patch has two levels:
1. In the lower level, sstable::data_consume_rows(), which reads all
partitions in a given disk byte range, now gets another byte position,
"last_end". That can be the range's end, the end of the file, or anything
in between the two. It opens the disk stream until last_end, which means
1. we will never read-ahead beyond last_end, and 2. fast_fordward_to() is
not allowed beyond last_end.
2. In the upper level, we add to the various layers of sstable readers,
mutation readers, etc., a boolean flag mutation_reader::forwarding, which
says whether fast_forward_to() is allowed on the stream of mutations to
move the stream to a different partition range.
Note that this flag is separate from the existing boolean flag
streamed_mutation::fowarding - that one talks about skipping inside a
single partition, while the flag we are adding is about switching the
partition range being read. Most of the functions that previously
accepted streamed_mutation::forwarding now accept *also* the option
mutation_reader::forwarding. The exception are functions which are known
to read only a single partition, and not support fast_forward_to() a
different partition range.
We note that if mutation_reader::forwarding::no is requested, and
fast_forward_to() is forbidden, there is no point in reading anything
beyond the range's end, so data_consume_rows() is called with last_end as
the range's end. But if forwarding::yes is requested, we use the end of the
file as last_end, exactly like the code before this patch did.
Importantly, we note that the repair's partition reading code,
column_family::make_streaming_reader, uses mutation_reader::forwarding::no,
while the other existing reading code will use the default forwarding::yes.
In the future, we can further optimize the amount of bytes read from disk
by replacing forwarding::yes by an actual last partition that may ever be
read, and use its byte position as the last_end passed to data_consume_rows.
But we don't do this yet, and it's not a regression from the existing code,
which also opened the file input stream until the end of the file, and not
until the end of the range query. Moreover, such an improvement will not
improve of anything if the overall range is always very large, in which
case not over-reading at its end will not improve performance.
Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Message-Id: <20170619152629.11703-1-nyh@scylladb.com>
This commit is contained in:
47
database.cc
47
database.cc
@@ -180,8 +180,9 @@ column_family::sstables_as_mutation_source() {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
return make_sstable_reader(std::move(s), r, slice, pc, std::move(trace_state), fwd);
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_sstable_reader(std::move(s), r, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -380,12 +381,13 @@ class range_sstable_reader final : public combined_mutation_reader {
|
||||
tracing::trace_state_ptr _trace_state;
|
||||
const query::partition_slice& _slice;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
private:
|
||||
std::unique_ptr<mutation_reader> create_reader(sstables::shared_sstable sst) {
|
||||
tracing::trace(_trace_state, "Reading partition range {} from sstable {}", *_pr, seastar::value_of([&sst] { return sst->get_filename(); }));
|
||||
// FIXME: make sstable::read_range_rows() return ::mutation_reader so that we can drop this wrapper.
|
||||
mutation_reader reader =
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, _s, *_pr, _slice, _pc, _fwd);
|
||||
make_mutation_reader<sstable_range_wrapping_reader>(sst, _s, *_pr, _slice, _pc, _fwd, _fwd_mr);
|
||||
if (sst->is_shared()) {
|
||||
reader = make_filtering_reader(std::move(reader), belongs_to_current_shard);
|
||||
}
|
||||
@@ -398,7 +400,8 @@ public:
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd)
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _s(s)
|
||||
, _pr(&pr)
|
||||
, _sstables(std::move(sstables))
|
||||
@@ -406,6 +409,7 @@ public:
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _slice(slice)
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
{
|
||||
auto ssts = _sstables->select(pr);
|
||||
std::vector<mutation_reader*> readers;
|
||||
@@ -528,7 +532,8 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) const {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
// restricts a reader's concurrency if the configuration specifies it
|
||||
auto restrict_reader = [&] (mutation_reader&& in) {
|
||||
auto&& config = [this, &pc] () -> const restricted_mutation_reader_config& {
|
||||
@@ -544,7 +549,7 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
}
|
||||
};
|
||||
|
||||
if (pr.is_singular() && pr.start()->value().has_key()) {
|
||||
if (pr.is_singular() && pr.start()->value().has_key() && !fwd_mr) {
|
||||
const dht::ring_position& pos = pr.start()->value();
|
||||
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
|
||||
return make_empty_reader(); // range doesn't belong to this shard
|
||||
@@ -553,7 +558,7 @@ column_family::make_sstable_reader(schema_ptr s,
|
||||
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
|
||||
} else {
|
||||
// range_sstable_reader is not movable so we need to wrap it
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, slice, pc, std::move(trace_state), fwd));
|
||||
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -601,9 +606,10 @@ column_family::make_reader(schema_ptr s,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) const {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const {
|
||||
if (_virtual_reader) {
|
||||
return (*_virtual_reader)(s, range, slice, pc, trace_state, fwd);
|
||||
return (*_virtual_reader)(s, range, slice, pc, trace_state, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
std::vector<mutation_reader> readers;
|
||||
@@ -630,13 +636,13 @@ column_family::make_reader(schema_ptr s,
|
||||
// https://github.com/scylladb/scylla/issues/185
|
||||
|
||||
for (auto&& mt : *_memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd));
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
|
||||
}
|
||||
|
||||
if (_config.enable_cache) {
|
||||
readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd));
|
||||
readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
} else {
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd));
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
}
|
||||
|
||||
return make_combined_reader(std::move(readers));
|
||||
@@ -652,10 +658,10 @@ column_family::make_streaming_reader(schema_ptr s,
|
||||
readers.reserve(_memtables->size() + 1);
|
||||
|
||||
for (auto&& mt : *_memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no));
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
|
||||
}
|
||||
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no));
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
|
||||
|
||||
return make_combined_reader(std::move(readers));
|
||||
}
|
||||
@@ -667,17 +673,17 @@ column_family::make_streaming_reader(schema_ptr s,
|
||||
auto& pc = service::get_local_streaming_read_priority();
|
||||
|
||||
auto source = mutation_source([this] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd) {
|
||||
const io_priority_class& pc, tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
||||
std::vector<mutation_reader> readers;
|
||||
readers.reserve(_memtables->size() + 1);
|
||||
for (auto&& mt : *_memtables) {
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd));
|
||||
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
|
||||
}
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd));
|
||||
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
|
||||
return make_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, streamed_mutation::forwarding::no);
|
||||
return make_multi_range_reader(s, std::move(source), ranges, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
}
|
||||
|
||||
future<std::vector<locked_cell>> column_family::lock_counter_cells(const mutation& m, timeout_clock::time_point timeout) {
|
||||
@@ -2782,8 +2788,9 @@ column_family::as_mutation_source() const {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
return this->make_reader(std::move(s), range, slice, pc, std::move(trace_state));
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return this->make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -628,7 +628,8 @@ private:
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) const;
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) const;
|
||||
|
||||
mutation_source sstables_as_mutation_source();
|
||||
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set>);
|
||||
@@ -679,7 +680,8 @@ public:
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no) const;
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const;
|
||||
|
||||
// The streaming mutation reader differs from the regular mutation reader in that:
|
||||
// - Reflects all writes accepted by replica prior to creation of the
|
||||
|
||||
@@ -274,7 +274,8 @@ struct virtual_reader {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_mutation_reader<size_estimates_mutation_reader>(schema, range, slice, fwd);
|
||||
}
|
||||
};
|
||||
|
||||
24
memtable.cc
24
memtable.cc
@@ -247,8 +247,9 @@ protected:
|
||||
mutation_reader delegate_reader(const dht::partition_range& delegate,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
auto ret = (*_memtable->_underlying)(_schema, delegate, slice, pc, nullptr, fwd);
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto ret = (*_memtable->_underlying)(_schema, delegate, slice, pc, nullptr, fwd, fwd_mr);
|
||||
_memtable = {};
|
||||
_last = {};
|
||||
return ret;
|
||||
@@ -267,17 +268,20 @@ class scanning_reader final: public iterator_reader {
|
||||
const io_priority_class& _pc;
|
||||
const query::partition_slice& _slice;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
public:
|
||||
scanning_reader(schema_ptr s,
|
||||
lw_shared_ptr<memtable> m,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd)
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: iterator_reader(std::move(s), std::move(m), range)
|
||||
, _pc(pc)
|
||||
, _slice(slice)
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
{ }
|
||||
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
@@ -288,7 +292,7 @@ public:
|
||||
// FIXME: Use cache. See column_family::make_reader().
|
||||
_delegate_range = get_delegate_range();
|
||||
if (_delegate_range) {
|
||||
_delegate = delegate_reader(*_delegate_range, _slice, _pc, _fwd);
|
||||
_delegate = delegate_reader(*_delegate_range, _slice, _pc, _fwd, _fwd_mr);
|
||||
return _delegate();
|
||||
}
|
||||
|
||||
@@ -427,7 +431,8 @@ memtable::make_reader(schema_ptr s,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state_ptr,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
if (query::is_single_partition(range)) {
|
||||
const query::ring_position& pos = range.start()->value();
|
||||
return _read_section(*this, [&] {
|
||||
@@ -441,7 +446,7 @@ memtable::make_reader(schema_ptr s,
|
||||
}
|
||||
});
|
||||
} else {
|
||||
return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), range, slice, pc, fwd);
|
||||
return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(), range, slice, pc, fwd, fwd_mr);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -451,7 +456,7 @@ memtable::make_flush_reader(schema_ptr s, const io_priority_class& pc) {
|
||||
return make_mutation_reader<flush_reader>(std::move(s), shared_from_this());
|
||||
} else {
|
||||
return make_mutation_reader<scanning_reader>(std::move(s), shared_from_this(),
|
||||
query::full_partition_range, query::full_slice, pc, streamed_mutation::forwarding::no);
|
||||
query::full_partition_range, query::full_slice, pc, streamed_mutation::forwarding::no, mutation_reader::forwarding::no);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -510,8 +515,9 @@ mutation_source memtable::as_data_source() {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
return mt->make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd);
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return mt->make_reader(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -182,7 +182,8 @@ public:
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state_ptr = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
|
||||
|
||||
|
||||
mutation_reader make_flush_reader(schema_ptr, const io_priority_class& pc);
|
||||
|
||||
@@ -281,10 +281,11 @@ private:
|
||||
public:
|
||||
multi_range_mutation_reader(schema_ptr s, mutation_source source, const ranges_vector& ranges,
|
||||
const query::partition_slice& slice, const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd)
|
||||
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _ranges(ranges)
|
||||
, _current_range(_ranges.begin())
|
||||
, _reader(source(s, *_current_range, slice, pc, trace_state, fwd))
|
||||
, _reader(source(s, *_current_range, slice, pc, trace_state, fwd, fwd_mr))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -316,8 +317,9 @@ public:
|
||||
mutation_reader
|
||||
make_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges,
|
||||
const query::partition_slice& slice, const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd)
|
||||
tracing::trace_state_ptr trace_state, streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
return make_mutation_reader<multi_range_mutation_reader>(std::move(s), std::move(source), ranges,
|
||||
slice, pc, std::move(trace_state), fwd);
|
||||
slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
@@ -50,6 +50,20 @@
|
||||
// not be the optimal object to use here.
|
||||
class mutation_reader final {
|
||||
public:
|
||||
// mutation_reader::forwarding determines whether fast_forward_to() may
|
||||
// be used on the mutation reader to change the partition range being
|
||||
// read. Enabling forwarding also changes read policy: forwarding::no
|
||||
// means we will stop reading from disk at the end of the given range,
|
||||
// but with forwarding::yes we may read ahead, anticipating the user to
|
||||
// make a small skip with fast_forward_to() and continuing to read.
|
||||
//
|
||||
// Note that mutation_reader::forwarding is similarly name but different
|
||||
// from streamed_mutation::forwarding - the former is about skipping to
|
||||
// a different partition range, while the latter is about skipping
|
||||
// inside a large partition.
|
||||
class forwarding_tag;
|
||||
using forwarding = bool_class<forwarding_tag>;
|
||||
|
||||
class impl {
|
||||
public:
|
||||
virtual ~impl() {}
|
||||
@@ -259,7 +273,8 @@ class mutation_source {
|
||||
const query::partition_slice&,
|
||||
io_priority,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding
|
||||
)>;
|
||||
// We could have our own version of std::function<> that is nothrow
|
||||
// move constructible and save some indirection and allocation.
|
||||
@@ -272,15 +287,15 @@ private:
|
||||
public:
|
||||
mutation_source(func_type fn) : _fn(std::make_unique<func_type>(std::move(fn))) {}
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, partition_range, const query::partition_slice&, io_priority)> fn)
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding) {
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
|
||||
return fn(s, range, slice, pc);
|
||||
})) {}
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, partition_range, const query::partition_slice&)> fn)
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding) {
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
|
||||
return fn(s, range, slice);
|
||||
})) {}
|
||||
mutation_source(std::function<mutation_reader(schema_ptr, partition_range range)> fn)
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding) {
|
||||
: _fn(std::make_unique<func_type>([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
|
||||
return fn(s, range);
|
||||
})) {}
|
||||
|
||||
@@ -304,9 +319,10 @@ public:
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
io_priority pc = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no) const
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const
|
||||
{
|
||||
return (*_fn)(std::move(s), range, slice, pc, std::move(trace_state), fwd);
|
||||
return (*_fn)(std::move(s), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -463,4 +479,5 @@ stable_flattened_mutations_consumer<FlattenedConsumer> make_stable_flattened_mut
|
||||
mutation_reader
|
||||
make_multi_range_reader(schema_ptr s, mutation_source source, const dht::partition_range_vector& ranges,
|
||||
const query::partition_slice& slice, const io_priority_class& pc = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
tracing::trace_state_ptr trace_state = nullptr, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes);
|
||||
|
||||
19
row_cache.cc
19
row_cache.cc
@@ -551,6 +551,7 @@ class scanning_and_populating_reader final : public mutation_reader::impl {
|
||||
just_cache_scanning_reader _primary_reader;
|
||||
range_populating_reader _secondary_reader;
|
||||
streamed_mutation::forwarding _fwd;
|
||||
mutation_reader::forwarding _fwd_mr;
|
||||
streamed_mutation_opt _next_primary;
|
||||
bool _secondary_in_progress = false;
|
||||
bool _first_element = true;
|
||||
@@ -628,12 +629,14 @@ public:
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd)
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _pr(&range)
|
||||
, _schema(s)
|
||||
, _primary_reader(s, cache, range, slice, pc, fwd)
|
||||
, _secondary_reader(cache, s, slice, pc, trace_state, fwd)
|
||||
, _fwd(fwd)
|
||||
, _fwd_mr(fwd_mr)
|
||||
{ }
|
||||
|
||||
future<streamed_mutation_opt> operator()() {
|
||||
@@ -658,8 +661,9 @@ row_cache::make_scanning_reader(schema_ptr s,
|
||||
const io_priority_class& pc,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range, slice, pc, std::move(trace_state), fwd);
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_mutation_reader<scanning_and_populating_reader>(std::move(s), *this, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
mutation_reader
|
||||
@@ -668,12 +672,13 @@ row_cache::make_reader(schema_ptr s,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
if (range.is_singular()) {
|
||||
const query::ring_position& pos = range.start()->value();
|
||||
|
||||
if (!pos.has_key()) {
|
||||
return make_scanning_reader(std::move(s), range, pc, slice, std::move(trace_state), fwd);
|
||||
return make_scanning_reader(std::move(s), range, pc, slice, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
return _read_section(_tracker.region(), [&] {
|
||||
@@ -686,7 +691,7 @@ row_cache::make_reader(schema_ptr s,
|
||||
upgrade_entry(e);
|
||||
mutation_reader reader;
|
||||
if (e.wide_partition()) {
|
||||
reader = _underlying(s, range, slice, pc, std::move(trace_state), fwd);
|
||||
reader = _underlying(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
_tracker.on_uncached_wide_partition();
|
||||
on_miss();
|
||||
} else {
|
||||
@@ -704,7 +709,7 @@ row_cache::make_reader(schema_ptr s,
|
||||
});
|
||||
}
|
||||
|
||||
return make_scanning_reader(std::move(s), range, pc, slice, std::move(trace_state), fwd);
|
||||
return make_scanning_reader(std::move(s), range, pc, slice, std::move(trace_state), fwd, fwd_mr);
|
||||
}
|
||||
|
||||
row_cache::~row_cache() {
|
||||
|
||||
@@ -286,7 +286,8 @@ private:
|
||||
const io_priority_class& pc,
|
||||
const query::partition_slice& slice,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding);
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding);
|
||||
void on_hit();
|
||||
void on_miss();
|
||||
void on_uncached_wide_partition();
|
||||
@@ -338,7 +339,8 @@ public:
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& = default_priority_class(),
|
||||
tracing::trace_state_ptr trace_state = nullptr,
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);
|
||||
|
||||
const stats& stats() const { return _stats; }
|
||||
public:
|
||||
|
||||
@@ -34,9 +34,10 @@ public:
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd)
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _sst(sst)
|
||||
, _smr(sst->read_range_rows(std::move(s), pr, slice, pc, fwd)) {
|
||||
, _smr(sst->read_range_rows(std::move(s), pr, slice, pc, fwd, fwd_mr)) {
|
||||
}
|
||||
virtual future<streamed_mutation_opt> operator()() override {
|
||||
return _smr.read();
|
||||
|
||||
@@ -764,12 +764,12 @@ struct sstable_data_source : public enable_lw_shared_from_this<sstable_data_sour
|
||||
, _schema(std::move(s))
|
||||
{ }
|
||||
|
||||
sstable_data_source(schema_ptr s, shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread,
|
||||
sstable_data_source(schema_ptr s, shared_sstable sst, mp_row_consumer&& consumer, sstable::disk_read_range toread, uint64_t last_end,
|
||||
std::unique_ptr<index_reader> lh_index = {}, std::unique_ptr<index_reader> rh_index = {})
|
||||
: _sst(std::move(sst))
|
||||
, _consumer(std::move(consumer))
|
||||
, _read_enabled(bool(toread))
|
||||
, _context(_sst->data_consume_rows(_consumer, std::move(toread)))
|
||||
, _context(_sst->data_consume_rows(_consumer, std::move(toread), last_end))
|
||||
, _lh_index(std::move(lh_index))
|
||||
, _rh_index(std::move(rh_index))
|
||||
, _schema(std::move(s))
|
||||
@@ -1051,12 +1051,12 @@ private:
|
||||
lw_shared_ptr<sstable_data_source> _ds;
|
||||
std::function<future<lw_shared_ptr<sstable_data_source>> ()> _get_data_source;
|
||||
public:
|
||||
impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread,
|
||||
impl(shared_sstable sst, schema_ptr schema, sstable::disk_read_range toread, uint64_t last_end,
|
||||
const io_priority_class &pc,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), toread, &pc, fwd] {
|
||||
: _get_data_source([this, sst = std::move(sst), s = std::move(schema), toread, last_end, &pc, fwd] {
|
||||
auto consumer = mp_row_consumer(s, query::full_slice, pc, fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), std::move(toread));
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), std::move(toread), last_end);
|
||||
return make_ready_future<lw_shared_ptr<sstable_data_source>>(std::move(ds));
|
||||
}) { }
|
||||
impl(shared_sstable sst, schema_ptr schema,
|
||||
@@ -1072,16 +1072,17 @@ public:
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd)
|
||||
: _get_data_source([this, pr, sst = std::move(sst), s = std::move(schema), &pc, &slice, fwd] () mutable {
|
||||
streamed_mutation::forwarding fwd,
|
||||
::mutation_reader::forwarding fwd_mr)
|
||||
: _get_data_source([this, pr, sst = std::move(sst), s = std::move(schema), &pc, &slice, fwd, fwd_mr] () mutable {
|
||||
auto lh_index = sst->get_index_reader(pc); // lh = left hand
|
||||
auto rh_index = sst->get_index_reader(pc);
|
||||
auto f = seastar::when_all_succeed(lh_index->advance_to_start(pr), rh_index->advance_to_end(pr));
|
||||
return f.then([this, lh_index = std::move(lh_index), rh_index = std::move(rh_index), sst = std::move(sst), s = std::move(s), &pc, &slice, fwd] () mutable {
|
||||
return f.then([this, lh_index = std::move(lh_index), rh_index = std::move(rh_index), sst = std::move(sst), s = std::move(s), &pc, &slice, fwd, fwd_mr] () mutable {
|
||||
sstable::disk_read_range drr{lh_index->data_file_position(),
|
||||
rh_index->data_file_position()};
|
||||
auto consumer = mp_row_consumer(s, slice, pc, fwd);
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), drr, std::move(lh_index), std::move(rh_index));
|
||||
auto ds = make_lw_shared<sstable_data_source>(std::move(s), std::move(sst), std::move(consumer), drr, (fwd_mr ? sst->data_size() : drr.end), std::move(lh_index), std::move(rh_index));
|
||||
ds->_index_in_current_partition = true;
|
||||
ds->_will_likely_slice = sstable_data_source::will_likely_slice(slice);
|
||||
return ds;
|
||||
@@ -1286,9 +1287,10 @@ sstable::read_range_rows(schema_ptr schema,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
::mutation_reader::forwarding fwd_mr) {
|
||||
return std::make_unique<mutation_reader::impl>(
|
||||
shared_from_this(), std::move(schema), range, slice, pc, fwd);
|
||||
shared_from_this(), std::move(schema), range, slice, pc, fwd, fwd_mr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -444,9 +444,14 @@ future<> data_consume_context::skip_to(indexable_element el, uint64_t begin) {
|
||||
}
|
||||
|
||||
data_consume_context sstable::data_consume_rows(
|
||||
row_consumer& consumer, sstable::disk_read_range toread) {
|
||||
row_consumer& consumer, sstable::disk_read_range toread, uint64_t last_end) {
|
||||
// Although we were only asked to read until toread.end, we'll not limit
|
||||
// the underlying file input stream to this end, but rather to last_end.
|
||||
// This potentially enables read-ahead beyond end, until last_end, which
|
||||
// can be beneficial if the user wants to fast_forward_to() on the
|
||||
// returned context, and may make small skips.
|
||||
return std::make_unique<data_consume_context::impl>(shared_from_this(),
|
||||
consumer, data_stream(toread.start, data_size() - toread.start,
|
||||
consumer, data_stream(toread.start, last_end - toread.start,
|
||||
consumer.io_priority(), _partition_range_history), toread.start, toread.end - toread.start);
|
||||
}
|
||||
|
||||
@@ -459,7 +464,7 @@ data_consume_context sstable::data_consume_single_partition(
|
||||
|
||||
|
||||
data_consume_context sstable::data_consume_rows(row_consumer& consumer) {
|
||||
return data_consume_rows(consumer, {0, data_size()});
|
||||
return data_consume_rows(consumer, {0, data_size()}, data_size());
|
||||
}
|
||||
|
||||
future<> sstable::data_consume_rows_at_once(row_consumer& consumer,
|
||||
|
||||
@@ -2824,12 +2824,13 @@ mutation_source sstable::as_mutation_source() {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_ptr,
|
||||
streamed_mutation::forwarding fwd) mutable {
|
||||
if (range.is_singular() && range.start()->value().has_key()) {
|
||||
streamed_mutation::forwarding fwd,
|
||||
::mutation_reader::forwarding fwd_mr) mutable {
|
||||
if (!fwd_mr && range.is_singular() && range.start()->value().has_key()) {
|
||||
const dht::ring_position& pos = range.start()->value();
|
||||
return make_mutation_reader<single_partition_reader_adaptor>(sst, s, pos, slice, pc, fwd);
|
||||
} else {
|
||||
return make_mutation_reader<range_reader_adaptor>(sst, sst->read_range_rows(s, range, slice, pc, fwd));
|
||||
return make_mutation_reader<range_reader_adaptor>(sst, sst->read_range_rows(s, range, slice, pc, fwd, fwd_mr));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -188,6 +188,7 @@ public:
|
||||
// TODO: this should become a vector of ranges
|
||||
uint64_t start;
|
||||
uint64_t end;
|
||||
|
||||
disk_read_range() : start(0), end(0) {}
|
||||
disk_read_range(uint64_t start, uint64_t end) :
|
||||
start(start), end(end) { }
|
||||
@@ -212,7 +213,22 @@ public:
|
||||
// The caller must ensure (e.g., using do_with()) that the context object,
|
||||
// as well as the sstable, remains alive as long as a read() is in
|
||||
// progress (i.e., returned a future which hasn't completed yet).
|
||||
data_consume_context data_consume_rows(row_consumer& consumer, disk_read_range toread);
|
||||
//
|
||||
// The "toread" range specifies the range we want to read initially.
|
||||
// However, the object returned by the read, a data_consume_context, also
|
||||
// provides a fast_forward_to(start,end) method which allows resetting
|
||||
// the reader to a new range. To allow that, we also have a "last_end"
|
||||
// byte which should be the last end to which fast_forward_to is
|
||||
// eventually allowed. If last_end==end, fast_forward_to is not allowed
|
||||
// at all, if last_end==file_size fast_forward_to is allowed until the
|
||||
// end of the file, and it can be something in between if we know that we
|
||||
// are planning to skip parts, but eventually read until last_end.
|
||||
// When last_end==end, we guarantee that the read will only read the
|
||||
// desired byte range from disk. However, when last_end > end, we may
|
||||
// read beyond end in anticipation of a small skip via fast_foward_to.
|
||||
// The amount of this excessive read is controlled by read ahead
|
||||
// hueristics which learn from the usefulness of previous read aheads.
|
||||
data_consume_context data_consume_rows(row_consumer& consumer, disk_read_range toread, uint64_t last_end);
|
||||
|
||||
data_consume_context data_consume_single_partition(row_consumer& consumer, disk_read_range toread);
|
||||
|
||||
@@ -271,7 +287,8 @@ public:
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice = query::full_slice,
|
||||
const io_priority_class& pc = default_priority_class(),
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
|
||||
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
|
||||
::mutation_reader::forwarding fwd_mr = ::mutation_reader::forwarding::yes);
|
||||
|
||||
// read_rows() returns each of the rows in the sstable, in sequence,
|
||||
// converted to a "mutation" data structure.
|
||||
|
||||
@@ -64,7 +64,7 @@ struct mutation_less_cmp {
|
||||
};
|
||||
mutation_source make_source(std::vector<mutation> mutations) {
|
||||
return mutation_source([mutations = std::move(mutations)] (schema_ptr s, const dht::partition_range& range, const query::partition_slice& slice,
|
||||
const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd) {
|
||||
const io_priority_class& pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding fwd_mr) {
|
||||
assert(range.is_full()); // slicing not implemented yet
|
||||
for (auto&& m : mutations) {
|
||||
assert(m.schema() == s);
|
||||
|
||||
@@ -590,8 +590,9 @@ SEASTAR_TEST_CASE(test_row_cache_conforms_to_mutation_source) {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd) {
|
||||
return cache->make_reader(s, range, slice, pc, std::move(trace_state), fwd);
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cache->make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3505,7 +3505,8 @@ SEASTAR_TEST_CASE(test_skipping_using_index) {
|
||||
query::full_slice,
|
||||
default_priority_class(),
|
||||
nullptr,
|
||||
streamed_mutation::forwarding::yes);
|
||||
streamed_mutation::forwarding::yes,
|
||||
::mutation_reader::forwarding::yes);
|
||||
|
||||
// Consume first partition completely so that index is stale
|
||||
{
|
||||
|
||||
@@ -474,7 +474,7 @@ SEASTAR_TEST_CASE(compressed_row_read_at_once) {
|
||||
SEASTAR_TEST_CASE(uncompressed_rows_read_one) {
|
||||
return reusable_sst(uncompressed_schema(), "tests/sstables/uncompressed", 1).then([] (auto sstp) {
|
||||
return do_with(test_row_consumer(1418656871665302), [sstp] (auto& c) {
|
||||
auto context = sstp->data_consume_rows(c, {0, 95});
|
||||
auto context = sstp->data_consume_rows(c, {0, 95}, 95);
|
||||
auto fut = context.read();
|
||||
return fut.then([sstp, &c, context = std::move(context)] {
|
||||
BOOST_REQUIRE(c.count_row_start == 1);
|
||||
@@ -492,7 +492,7 @@ SEASTAR_TEST_CASE(compressed_rows_read_one) {
|
||||
auto s = make_lw_shared(schema({}, "ks", "cf", {}, {}, {}, {}, utf8_type));
|
||||
return reusable_sst(std::move(s), "tests/sstables/compressed", 1).then([] (auto sstp) {
|
||||
return do_with(test_row_consumer(1418654707438005), [sstp] (auto& c) {
|
||||
auto context = sstp->data_consume_rows(c, {0, 95});
|
||||
auto context = sstp->data_consume_rows(c, {0, 95}, 95);
|
||||
auto fut = context.read();
|
||||
return fut.then([sstp, &c, context = std::move(context)] {
|
||||
BOOST_REQUIRE(c.count_row_start == 1);
|
||||
|
||||
@@ -184,11 +184,12 @@ SEASTAR_TEST_CASE(test_mutation_merger_conforms_to_mutation_source) {
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd)
|
||||
streamed_mutation::forwarding fwd,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
{
|
||||
std::vector<mutation_reader> readers;
|
||||
for (int i = 0; i < n; ++i) {
|
||||
readers.push_back(memtables[i]->make_reader(s, range, slice, pc, trace_state, fwd));
|
||||
readers.push_back(memtables[i]->make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
|
||||
}
|
||||
return make_combined_reader(std::move(readers));
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user