Merge data_consume_context::impl into data_consume_context

There's no reason to use pimpl in data_consume_context

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
This commit is contained in:
Piotr Jastrzebski
2017-11-17 16:13:51 +01:00
parent 644f9d9883
commit 7f8b62bc0b
2 changed files with 34 additions and 55 deletions

View File

@@ -387,66 +387,39 @@ public:
}
};
// data_consume_rows() and data_consume_rows_at_once() both can read just a
// single row or many rows. The difference is that data_consume_rows_at_once()
// is optimized to reading one or few rows (reading it all into memory), while
// data_consume_rows() uses a read buffer, so not all the rows need to fit
// memory in the same time (they are delivered to the consumer one by one).
class data_consume_context::impl {
private:
shared_sstable _sst;
std::unique_ptr<data_consume_rows_context> _ctx;
public:
impl(shared_sstable sst, row_consumer& consumer, input_stream<char>&& input, uint64_t start, uint64_t maxlen)
: _sst(std::move(sst))
, _ctx(new data_consume_rows_context(consumer, std::move(input), start, maxlen))
{ }
~impl() {
if (_ctx) {
auto f = _ctx->close();
f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)] (auto) { });
}
}
future<> read() {
return _ctx->consume_input(*_ctx);
}
future<> fast_forward_to(uint64_t begin, uint64_t end) {
_ctx->reset(indexable_element::partition);
return _ctx->fast_forward_to(begin, end);
}
future<> skip_to(indexable_element el, uint64_t begin) {
sstlog.trace("data_consume_rows_context {}: skip_to({} -> {}, el={})", _ctx.get(), _ctx->position(), begin, static_cast<int>(el));
if (begin <= _ctx->position()) {
return make_ready_future<>();
}
_ctx->reset(el);
return _ctx->skip_to(begin);
}
bool eof() const {
return _ctx->eof();
data_consume_context::~data_consume_context() {
if (_ctx) {
auto f = _ctx->close();
f.handle_exception([ctx = std::move(_ctx), sst = std::move(_sst)](auto) {});
}
};
data_consume_context::~data_consume_context() = default;
data_consume_context::data_consume_context(data_consume_context&& o) noexcept
: _pimpl(std::move(o._pimpl))
: _ctx(std::move(o._ctx))
{ }
data_consume_context& data_consume_context::operator=(data_consume_context&& o) noexcept {
_pimpl = std::move(o._pimpl);
_ctx = std::move(o._ctx);
return *this;
}
data_consume_context::data_consume_context(std::unique_ptr<impl> p) : _pimpl(std::move(p)) { }
data_consume_context::data_consume_context(shared_sstable sst, row_consumer& consumer, input_stream<char>&& input, uint64_t start, uint64_t maxlen)
: _sst(std::move(sst)), _ctx(new data_consume_rows_context(consumer, std::move(input), start, maxlen))
{ }
future<> data_consume_context::read() {
return _pimpl->read();
return _ctx->consume_input(*_ctx);
}
future<> data_consume_context::fast_forward_to(uint64_t begin, uint64_t end) {
return _pimpl->fast_forward_to(begin, end);
_ctx->reset(indexable_element::partition);
return _ctx->fast_forward_to(begin, end);
}
future<> data_consume_context::skip_to(indexable_element el, uint64_t begin) {
return _pimpl->skip_to(el, begin);
sstlog.trace("data_consume_rows_context {}: skip_to({} -> {}, el={})", _ctx.get(), _ctx->position(), begin, static_cast<int>(el));
if (begin <= _ctx->position()) {
return make_ready_future<>();
}
_ctx->reset(el);
return _ctx->skip_to(begin);
}
bool data_consume_context::eof() const {
return _pimpl->eof();
return _ctx->eof();
}
data_consume_context sstable::data_consume_rows(
@@ -456,16 +429,14 @@ data_consume_context sstable::data_consume_rows(
// This potentially enables read-ahead beyond end, until last_end, which
// can be beneficial if the user wants to fast_forward_to() on the
// returned context, and may make small skips.
return std::make_unique<data_consume_context::impl>(shared_from_this(),
consumer, data_stream(toread.start, last_end - toread.start,
consumer.io_priority(), consumer.resource_tracker(), _partition_range_history), toread.start, toread.end - toread.start);
return { shared_from_this(), consumer, data_stream(toread.start, last_end - toread.start,
consumer.io_priority(), consumer.resource_tracker(), _partition_range_history), toread.start, toread.end - toread.start };
}
data_consume_context sstable::data_consume_single_partition(
row_consumer& consumer, sstable::disk_read_range toread) {
return std::make_unique<data_consume_context::impl>(shared_from_this(),
consumer, data_stream(toread.start, toread.end - toread.start,
consumer.io_priority(), consumer.resource_tracker(), _single_partition_history), toread.start, toread.end - toread.start);
return { shared_from_this(), consumer, data_stream(toread.start, toread.end - toread.start,
consumer.io_priority(), consumer.resource_tracker(), _single_partition_history), toread.start, toread.end - toread.start };
}

View File

@@ -66,6 +66,8 @@ namespace sstables {
extern logging::logger sstlog;
class data_consume_rows_context;
// data_consume_context is an object returned by sstable::data_consume_rows()
// which allows knowing when the consumer stops reading, and starting it again
// (e.g., when the consumer wants to stop after every sstable row).
@@ -81,11 +83,17 @@ extern logging::logger sstlog;
// and the time the returned future is completed, the object lives on.
// Moreover, the sstable object used for the sstable::data_consume_rows()
// call which created this data_consume_context, must also be kept alive.
//
// data_consume_rows() and data_consume_rows_at_once() both can read just a
// single row or many rows. The difference is that data_consume_rows_at_once()
// is optimized to reading one or few rows (reading it all into memory), while
// data_consume_rows() uses a read buffer, so not all the rows need to fit
// memory in the same time (they are delivered to the consumer one by one).
class data_consume_context {
class impl;
std::unique_ptr<impl> _pimpl;
shared_sstable _sst;
std::unique_ptr<data_consume_rows_context> _ctx;
// This object can only be constructed by sstable::data_consume_rows()
data_consume_context(std::unique_ptr<impl>);
data_consume_context(shared_sstable,row_consumer& consumer, input_stream<char>&& input, uint64_t start, uint64_t maxlen);
friend class sstable;
public:
future<> read();