Merge "repair: row_level: prevent deadlocks when repairing homogenous nodes" from Botond
" Row level repair, when using a local reader, is prone to deadlocking on the streaming reader concurrency semaphore. This has been observed to happen with at least two participating nodes, running more concurrent repairs than the maximum allowed amount of reads by the concurrency semaphore. In this situation, it is possible that two repair instances, competing for the last available permits on both nodes, get a permit on one of the nodes and get queued on the other one respectively. As neither will let go of the permit it already acquired, nor give up waiting on the failed-to-acquired permit, a deadlock happens. To prevent this, we make the local repair reader evictable. For this we reuse the already existing evictable reader mechanism of the multishard combining reader. This patchset refactors this evictable reader mechanism into a standalone flat mutation reader, then exposes it to the outside world. The repair reader is paused after the repair buffer is filled, which is currently 32MB, so the cost of a possible reader recreation is amortized over 32MB read. The repair reader is said to be local, when it can use the shard-local partitioner. This is the case if the participating nodes are homogenous (their shard configuration is identical), that is the repair instance has to read just from one shard. A non-local reader uses the multishard reader, which already makes its shard readers evictable and hence is not prone to the deadlock described here. Fixes: #6272 Tests: unit(dev, release, debug) " * 'repair-row-level-evictable-local-reader/v3' of https://github.com/denesb/scylla: repair: row_level: destroy reader on EOS or error repair: row_level: use evictable_reader for local reads mutation_reader: expose evictable_reader mutation_reader: evictable_reader: add auto_pause flag mutation_reader: make evictable_reader a flat_mutation_reader mutation_reader: s/inactive_shard_read/inactive_evictable_reader/ mutation_reader: move inactive_shard_reader code up mutation_reader: fix indentation mutation_reader: shard_reader: extract remote_reader as evictable_reader mutation_reader: reader_lifecycle_policy: make semaphore() available early
This commit is contained in:
13
database.cc
13
database.cc
@@ -2029,9 +2029,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
reader_concurrency_semaphore* semaphore;
|
||||
};
|
||||
distributed<database>& _db;
|
||||
utils::UUID _table_id;
|
||||
std::vector<reader_context> _contexts;
|
||||
public:
|
||||
explicit streaming_reader_lifecycle_policy(distributed<database>& db) : _db(db), _contexts(smp::count) {
|
||||
streaming_reader_lifecycle_policy(distributed<database>& db, utils::UUID table_id) : _db(db), _table_id(table_id), _contexts(smp::count) {
|
||||
}
|
||||
virtual flat_mutation_reader create_reader(
|
||||
schema_ptr schema,
|
||||
@@ -2060,7 +2061,12 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
});
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
return *_contexts[this_shard_id()].semaphore;
|
||||
const auto shard = this_shard_id();
|
||||
if (!_contexts[shard].semaphore) {
|
||||
auto& cf = _db.local().find_column_family(_table_id);
|
||||
_contexts[shard].semaphore = &cf.streaming_read_concurrency_semaphore();
|
||||
}
|
||||
return *_contexts[shard].semaphore;
|
||||
}
|
||||
};
|
||||
auto ms = mutation_source([&db] (schema_ptr s,
|
||||
@@ -2071,7 +2077,8 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db), std::move(s), pr, ps, pc,
|
||||
auto table_id = s->id();
|
||||
return make_multishard_combining_reader(make_shared<streaming_reader_lifecycle_policy>(db, table_id), std::move(s), pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
});
|
||||
auto&& full_slice = schema->full_slice();
|
||||
|
||||
@@ -199,6 +199,7 @@ class read_context : public reader_lifecycle_policy {
|
||||
|
||||
// One for each shard. Index is shard id.
|
||||
std::vector<reader_meta> _readers;
|
||||
std::vector<reader_concurrency_semaphore*> _semaphores;
|
||||
|
||||
gate _dismantling_gate;
|
||||
|
||||
@@ -215,7 +216,8 @@ public:
|
||||
, _schema(std::move(s))
|
||||
, _cmd(cmd)
|
||||
, _ranges(ranges)
|
||||
, _trace_state(std::move(trace_state)) {
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _semaphores(smp::count, nullptr) {
|
||||
_readers.resize(smp::count);
|
||||
}
|
||||
|
||||
@@ -240,7 +242,11 @@ public:
|
||||
virtual void destroy_reader(shard_id shard, future<stopped_reader> reader_fut) noexcept override;
|
||||
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
return _readers[this_shard_id()].rparts->permit.semaphore();
|
||||
const auto shard = this_shard_id();
|
||||
if (!_semaphores[shard]) {
|
||||
_semaphores[shard] = &_db.local().make_query_class_config().semaphore;
|
||||
}
|
||||
return *_semaphores[shard];
|
||||
}
|
||||
|
||||
future<> lookup_readers();
|
||||
|
||||
@@ -980,6 +980,435 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
|
||||
namespace {
|
||||
|
||||
struct fill_buffer_result {
|
||||
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
|
||||
bool end_of_stream = false;
|
||||
|
||||
fill_buffer_result() = default;
|
||||
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
|
||||
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
|
||||
, end_of_stream(end_of_stream) {
|
||||
}
|
||||
};
|
||||
|
||||
class inactive_evictable_reader : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
public:
|
||||
inactive_evictable_reader(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader)) {
|
||||
}
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
// Encapsulates all data and logic that is local to the remote shard the
|
||||
// reader lives on.
|
||||
class evictable_reader : public flat_mutation_reader::impl {
|
||||
public:
|
||||
using auto_pause = bool_class<class auto_pause_tag>;
|
||||
|
||||
private:
|
||||
auto_pause _auto_pause;
|
||||
mutation_source _ms;
|
||||
reader_permit _permit;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
reader_concurrency_semaphore::inactive_read_handle _irh;
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
|
||||
// These are used when the reader has to be recreated (after having been
|
||||
// evicted while paused) and the range and/or slice it is recreated with
|
||||
// differs from the original ones.
|
||||
std::optional<dht::partition_range> _range_override;
|
||||
std::optional<query::partition_slice> _slice_override;
|
||||
bool _pending_next_partition = false;
|
||||
|
||||
flat_mutation_reader_opt _reader;
|
||||
|
||||
private:
|
||||
void do_pause(flat_mutation_reader reader);
|
||||
void maybe_pause(flat_mutation_reader reader);
|
||||
flat_mutation_reader_opt try_resume();
|
||||
void update_next_position(flat_mutation_reader& reader);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
evictable_reader(
|
||||
auto_pause ap,
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
~evictable_reader();
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
|
||||
virtual void next_partition() override;
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override;
|
||||
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point timeout) override {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
}
|
||||
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
|
||||
return std::move(_irh);
|
||||
}
|
||||
void pause() {
|
||||
if (_reader) {
|
||||
do_pause(std::move(*_reader));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void evictable_reader::do_pause(flat_mutation_reader reader) {
|
||||
_irh = _permit.semaphore().register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
}
|
||||
|
||||
void evictable_reader::maybe_pause(flat_mutation_reader reader) {
|
||||
if (_auto_pause) {
|
||||
do_pause(std::move(reader));
|
||||
} else {
|
||||
_reader = std::move(reader);
|
||||
}
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt evictable_reader::try_resume() {
|
||||
auto ir_ptr = _permit.semaphore().unregister_inactive_read(std::move(_irh));
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
}
|
||||
|
||||
void evictable_reader::update_next_position(flat_mutation_reader& reader) {
|
||||
if (is_buffer_empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto rbegin = std::reverse_iterator(buffer().end());
|
||||
auto rend = std::reverse_iterator(buffer().begin());
|
||||
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
|
||||
_last_pkey = pk_it->as_partition_start().key();
|
||||
}
|
||||
|
||||
const auto last_pos = buffer().back().position();
|
||||
switch (last_pos.region()) {
|
||||
case partition_region::partition_start:
|
||||
_next_position_in_partition = position_in_partition::for_static_row();
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void evictable_reader::adjust_partition_slice() {
|
||||
if (!_slice_override) {
|
||||
_slice_override = _ps;
|
||||
}
|
||||
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
|
||||
}
|
||||
|
||||
flat_mutation_reader evictable_reader::recreate_reader() {
|
||||
const dht::partition_range* range = _pr;
|
||||
const query::partition_slice* slice = &_ps;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
switch (_next_position_in_partition.region()) {
|
||||
case partition_region::partition_start:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// The original range contained a single partition and we've read it
|
||||
// all. We'd have to create a reader with an empty range that would
|
||||
// immediately be at EOS. This is not possible so just create an empty
|
||||
// reader instead.
|
||||
// This should be extremely rare (who'd create a multishard reader to
|
||||
// read a single partition) but still, let's make sure we handle it
|
||||
// correctly.
|
||||
if (_pr->is_singular() && !partition_range_is_inclusive) {
|
||||
return make_empty_flat_reader(_schema);
|
||||
}
|
||||
|
||||
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
|
||||
range = &*_range_override;
|
||||
}
|
||||
|
||||
return _ms.make_reader(
|
||||
_schema,
|
||||
_permit,
|
||||
*range,
|
||||
*slice,
|
||||
_pc,
|
||||
_trace_state,
|
||||
streamed_mutation::forwarding::no,
|
||||
_fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader evictable_reader::resume_or_create_reader() {
|
||||
if (!_reader_created) {
|
||||
auto reader = _ms.make_reader(_schema, _permit, *_pr, _ps, _pc, _trace_state, streamed_mutation::forwarding::no, _fwd_mr);
|
||||
_reader_created = true;
|
||||
return reader;
|
||||
}
|
||||
if (_reader) {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
if (auto reader_opt = try_resume()) {
|
||||
return std::move(*reader_opt);
|
||||
}
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
reader.move_buffer_content_to(*this);
|
||||
auto stop = [this, &reader] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
// All other fragment kinds are safe to end the buffer on, and
|
||||
// are guaranteed to represent progress vs. the last buffer fill.
|
||||
if (!buffer().back().is_range_tombstone()) {
|
||||
return true;
|
||||
}
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
// To ensure safe progress we have to ensure the following:
|
||||
//
|
||||
// _next_position_in_partition < buffer.back().position() < next_pos
|
||||
//
|
||||
// * The first condition is to ensure we made progress since the
|
||||
// last buffer fill. Otherwise we might get into an endless loop if
|
||||
// the reader is recreated after each `fill_buffer()` call.
|
||||
// * The second condition is to ensure we have seen all fragments
|
||||
// with the same position. Otherwise we might jump over those
|
||||
// remaining fragments with the same position as the last
|
||||
// fragment's in the buffer when the reader is recreated.
|
||||
return _tri_cmp(_next_position_in_partition, buffer().back().position()) < 0 && _tri_cmp(buffer().back().position(), next_pos) < 0;
|
||||
};
|
||||
// Read additional fragments until it is safe to stop, if needed.
|
||||
// We have to ensure we stop at a fragment such that if the reader is
|
||||
// evicted and recreated later, we won't be skipping any fragments.
|
||||
// Practically, range tombstones are the only ones that are
|
||||
// problematic to end the buffer on. This is due to the fact range
|
||||
// tombstones can have the same position that multiple following range
|
||||
// tombstones, or a single following clustering row in the stream has.
|
||||
// When a range tombstone is the last in the buffer, we have to continue
|
||||
// to read until we are sure we've read all fragments sharing the same
|
||||
// position, so that we can safely continue reading from after said
|
||||
// position.
|
||||
return do_until(stop, [this, &reader, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
push_mutation_fragment(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader] {
|
||||
update_next_position(reader);
|
||||
});
|
||||
}
|
||||
|
||||
evictable_reader::evictable_reader(
|
||||
auto_pause ap,
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(std::move(schema))
|
||||
, _auto_pause(ap)
|
||||
, _ms(std::move(ms))
|
||||
, _permit(std::move(permit))
|
||||
, _pr(&pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _tri_cmp(*_schema) {
|
||||
}
|
||||
|
||||
evictable_reader::~evictable_reader() {
|
||||
try_resume();
|
||||
}
|
||||
|
||||
future<> evictable_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
|
||||
if (pending_next_partition) {
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
if (is_end_of_stream()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return do_with(resume_or_create_reader(),
|
||||
[this, pending_next_partition, timeout] (flat_mutation_reader& reader) mutable {
|
||||
if (pending_next_partition) {
|
||||
reader.next_partition();
|
||||
}
|
||||
|
||||
return fill_buffer(reader, timeout).then([this, &reader] {
|
||||
_end_of_stream = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
maybe_pause(std::move(reader));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void evictable_reader::next_partition() {
|
||||
clear_buffer_to_next_partition();
|
||||
if (is_buffer_empty()) {
|
||||
_pending_next_partition = true;
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
}
|
||||
|
||||
future<> evictable_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_last_pkey.reset();
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
|
||||
if (_reader) {
|
||||
return _reader->fast_forward_to(pr, timeout);
|
||||
}
|
||||
if (!_reader_created || !_irh) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (auto reader_opt = try_resume()) {
|
||||
auto f = reader_opt->fast_forward_to(pr, timeout);
|
||||
return f.then([this, reader = std::move(*reader_opt)] () mutable {
|
||||
maybe_pause(std::move(reader));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
evictable_reader_handle::evictable_reader_handle(evictable_reader& r) : _r(&r)
|
||||
{ }
|
||||
|
||||
void evictable_reader_handle::evictable_reader_handle::pause() {
|
||||
_r->pause();
|
||||
}
|
||||
|
||||
flat_mutation_reader make_auto_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return make_flat_mutation_reader<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms), std::move(schema), std::move(permit), pr, ps,
|
||||
pc, std::move(trace_state), fwd_mr);
|
||||
}
|
||||
|
||||
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
auto reader = std::make_unique<evictable_reader>(evictable_reader::auto_pause::no, std::move(ms), std::move(schema), std::move(permit), pr, ps,
|
||||
pc, std::move(trace_state), fwd_mr);
|
||||
auto handle = evictable_reader_handle(*reader.get());
|
||||
return std::pair(flat_mutation_reader(std::move(reader)), handle);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// A special-purpose shard reader.
|
||||
//
|
||||
// Shard reader manages a reader located on a remote shard. It transparently
|
||||
@@ -990,66 +1419,6 @@ namespace {
|
||||
// wrapped into a flat_mutation_reader, as it needs to be managed by a shared
|
||||
// pointer.
|
||||
class shard_reader : public enable_lw_shared_from_this<shard_reader>, public flat_mutation_reader::impl {
|
||||
struct fill_buffer_result {
|
||||
foreign_ptr<std::unique_ptr<const circular_buffer<mutation_fragment>>> buffer;
|
||||
bool end_of_stream = false;
|
||||
|
||||
fill_buffer_result() = default;
|
||||
fill_buffer_result(circular_buffer<mutation_fragment> buffer, bool end_of_stream)
|
||||
: buffer(make_foreign(std::make_unique<const circular_buffer<mutation_fragment>>(std::move(buffer))))
|
||||
, end_of_stream(end_of_stream) {
|
||||
}
|
||||
};
|
||||
|
||||
// Encapsulates all data and logic that is local to the remote shard the
|
||||
// reader lives on.
|
||||
class remote_reader {
|
||||
schema_ptr _schema;
|
||||
reader_lifecycle_policy& _lifecycle_policy;
|
||||
const dht::partition_range* _pr;
|
||||
const query::partition_slice& _ps;
|
||||
const io_priority_class& _pc;
|
||||
tracing::global_trace_state_ptr _trace_state;
|
||||
const mutation_reader::forwarding _fwd_mr;
|
||||
reader_concurrency_semaphore::inactive_read_handle _irh;
|
||||
bool _reader_created = false;
|
||||
bool _drop_partition_start = false;
|
||||
bool _drop_static_row = false;
|
||||
position_in_partition::tri_compare _tri_cmp;
|
||||
|
||||
std::optional<dht::decorated_key> _last_pkey;
|
||||
position_in_partition _next_position_in_partition = position_in_partition::for_partition_start();
|
||||
// These are used when the reader has to be recreated (after having been
|
||||
// evicted while paused) and the range and/or slice it is recreated with
|
||||
// differs from the original ones.
|
||||
std::optional<dht::partition_range> _range_override;
|
||||
std::optional<query::partition_slice> _slice_override;
|
||||
|
||||
private:
|
||||
void update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer);
|
||||
void adjust_partition_slice();
|
||||
flat_mutation_reader recreate_reader();
|
||||
flat_mutation_reader resume_or_create_reader();
|
||||
bool should_drop_fragment(const mutation_fragment& mf);
|
||||
future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout);
|
||||
future<> fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer, db::timeout_clock::time_point timeout);
|
||||
|
||||
public:
|
||||
remote_reader(
|
||||
schema_ptr schema,
|
||||
reader_lifecycle_policy& lifecycle_policy,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
future<fill_buffer_result> fill_buffer(const dht::partition_range& pr, bool pending_next_partition, db::timeout_clock::time_point timeout);
|
||||
future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout);
|
||||
reader_concurrency_semaphore::inactive_read_handle inactive_read_handle() && {
|
||||
return std::move(_irh);
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
shared_ptr<reader_lifecycle_policy> _lifecycle_policy;
|
||||
const unsigned _shard;
|
||||
@@ -1061,7 +1430,7 @@ private:
|
||||
bool _pending_next_partition = false;
|
||||
bool _stopped = false;
|
||||
std::optional<future<>> _read_ahead;
|
||||
foreign_ptr<std::unique_ptr<remote_reader>> _reader;
|
||||
foreign_ptr<std::unique_ptr<evictable_reader>> _reader;
|
||||
|
||||
private:
|
||||
future<> do_fill_buffer(db::timeout_clock::time_point timeout);
|
||||
@@ -1123,275 +1492,50 @@ void shard_reader::stop() noexcept {
|
||||
|
||||
_lifecycle_policy->destroy_reader(_shard, f.then([this] {
|
||||
return smp::submit_to(_shard, [this] {
|
||||
return make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle()));
|
||||
}).then([this] (foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>> irh) {
|
||||
return reader_lifecycle_policy::stopped_reader{std::move(irh), detach_buffer(), _pending_next_partition};
|
||||
auto ret = std::tuple(
|
||||
make_foreign(std::make_unique<reader_concurrency_semaphore::inactive_read_handle>(std::move(*_reader).inactive_read_handle())),
|
||||
make_foreign(std::make_unique<circular_buffer<mutation_fragment>>(_reader->detach_buffer())));
|
||||
_reader.reset();
|
||||
return ret;
|
||||
}).then([this] (std::tuple<foreign_ptr<std::unique_ptr<reader_concurrency_semaphore::inactive_read_handle>>,
|
||||
foreign_ptr<std::unique_ptr<circular_buffer<mutation_fragment>>>> remains) {
|
||||
auto&& [irh, remote_buffer] = remains;
|
||||
auto buffer = detach_buffer();
|
||||
for (const auto& mf : *remote_buffer) {
|
||||
buffer.emplace_back(*_schema, mf); // we are copying from the remote shard.
|
||||
}
|
||||
return reader_lifecycle_policy::stopped_reader{std::move(irh), std::move(buffer), _pending_next_partition};
|
||||
});
|
||||
}).finally([zis = shared_from_this()] {}));
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::update_next_position(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) {
|
||||
if (buffer.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto rbegin = std::reverse_iterator(buffer.end());
|
||||
auto rend = std::reverse_iterator(buffer.begin());
|
||||
if (auto pk_it = std::find_if(rbegin, rend, std::mem_fn(&mutation_fragment::is_partition_start)); pk_it != rend) {
|
||||
_last_pkey = pk_it->as_partition_start().key();
|
||||
}
|
||||
|
||||
const auto last_pos = buffer.back().position();
|
||||
switch (last_pos.region()) {
|
||||
case partition_region::partition_start:
|
||||
_next_position_in_partition = position_in_partition::for_static_row();
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_next_position_in_partition = position_in_partition::before_all_clustered_rows();
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
if (reader.is_buffer_empty()) {
|
||||
_next_position_in_partition = position_in_partition::after_key(last_pos);
|
||||
} else {
|
||||
const auto& next_frag = reader.peek_buffer();
|
||||
if (next_frag.is_end_of_partition()) {
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
} else {
|
||||
_next_position_in_partition = position_in_partition(next_frag.position());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void shard_reader::remote_reader::adjust_partition_slice() {
|
||||
if (!_slice_override) {
|
||||
_slice_override = _ps;
|
||||
}
|
||||
|
||||
auto ranges = _slice_override->default_row_ranges();
|
||||
query::trim_clustering_row_ranges_to(*_schema, ranges, _next_position_in_partition);
|
||||
|
||||
_slice_override->clear_ranges();
|
||||
_slice_override->set_range(*_schema, _last_pkey->key(), std::move(ranges));
|
||||
}
|
||||
|
||||
flat_mutation_reader shard_reader::remote_reader::recreate_reader() {
|
||||
const dht::partition_range* range = _pr;
|
||||
const query::partition_slice* slice = &_ps;
|
||||
|
||||
if (_last_pkey) {
|
||||
bool partition_range_is_inclusive = true;
|
||||
|
||||
switch (_next_position_in_partition.region()) {
|
||||
case partition_region::partition_start:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
case partition_region::static_row:
|
||||
_drop_partition_start = true;
|
||||
break;
|
||||
case partition_region::clustered:
|
||||
_drop_partition_start = true;
|
||||
_drop_static_row = true;
|
||||
adjust_partition_slice();
|
||||
slice = &*_slice_override;
|
||||
break;
|
||||
case partition_region::partition_end:
|
||||
partition_range_is_inclusive = false;
|
||||
break;
|
||||
}
|
||||
|
||||
// The original range contained a single partition and we've read it
|
||||
// all. We'd have to create a reader with an empty range that would
|
||||
// immediately be at EOS. This is not possible so just create an empty
|
||||
// reader instead.
|
||||
// This should be extremely rare (who'd create a multishard reader to
|
||||
// read a single partition) but still, let's make sure we handle it
|
||||
// correctly.
|
||||
if (_pr->is_singular() && !partition_range_is_inclusive) {
|
||||
return make_empty_flat_reader(_schema);
|
||||
}
|
||||
|
||||
_range_override = dht::partition_range({dht::partition_range::bound(*_last_pkey, partition_range_is_inclusive)}, _pr->end());
|
||||
range = &*_range_override;
|
||||
}
|
||||
|
||||
return _lifecycle_policy.create_reader(
|
||||
_schema,
|
||||
*range,
|
||||
*slice,
|
||||
_pc,
|
||||
_trace_state,
|
||||
_fwd_mr);
|
||||
}
|
||||
|
||||
flat_mutation_reader shard_reader::remote_reader::resume_or_create_reader() {
|
||||
if (!_reader_created) {
|
||||
auto reader = _lifecycle_policy.create_reader(_schema, *_pr, _ps, _pc, _trace_state, _fwd_mr);
|
||||
_reader_created = true;
|
||||
return reader;
|
||||
}
|
||||
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
|
||||
return std::move(*reader_opt);
|
||||
}
|
||||
return recreate_reader();
|
||||
}
|
||||
|
||||
bool shard_reader::remote_reader::should_drop_fragment(const mutation_fragment& mf) {
|
||||
if (_drop_partition_start && mf.is_partition_start()) {
|
||||
_drop_partition_start = false;
|
||||
return true;
|
||||
}
|
||||
if (_drop_static_row && mf.is_static_row()) {
|
||||
_drop_static_row = false;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) {
|
||||
if (!_drop_partition_start && !_drop_static_row) {
|
||||
return reader.fill_buffer(timeout);
|
||||
}
|
||||
return repeat([this, &reader, timeout] {
|
||||
return reader.fill_buffer(timeout).then([this, &reader] {
|
||||
while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) {
|
||||
reader.pop_mutation_fragment();
|
||||
}
|
||||
return stop_iteration(reader.is_buffer_full() || reader.is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::fill_buffer(flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
return do_fill_buffer(reader, timeout).then([this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
buffer = reader.detach_buffer();
|
||||
auto stop = [this, &reader, &buffer] {
|
||||
// The only problematic fragment kind is the range tombstone.
|
||||
// All other fragment kinds are safe to end the buffer on, and
|
||||
// are guaranteed to represent progress vs. the last buffer fill.
|
||||
if (!buffer.back().is_range_tombstone()) {
|
||||
return true;
|
||||
}
|
||||
if (reader.is_buffer_empty()) {
|
||||
return reader.is_end_of_stream();
|
||||
}
|
||||
const auto& next_pos = reader.peek_buffer().position();
|
||||
// To ensure safe progress we have to ensure the following:
|
||||
//
|
||||
// _next_position_in_partition < buffer.back().position() < next_pos
|
||||
//
|
||||
// * The first condition is to ensure we made progress since the
|
||||
// last buffer fill. Otherwise we might get into an endless loop if
|
||||
// the reader is recreated after each `fill_buffer()` call.
|
||||
// * The second condition is to ensure we have seen all fragments
|
||||
// with the same position. Otherwise we might jump over those
|
||||
// remaining fragments with the same position as the last
|
||||
// fragment's in the buffer when the reader is recreated.
|
||||
return _tri_cmp(_next_position_in_partition, buffer.back().position()) < 0 && _tri_cmp(buffer.back().position(), next_pos) < 0;
|
||||
};
|
||||
// Read additional fragments until it is safe to stop, if needed.
|
||||
// We have to ensure we stop at a fragment such that if the reader is
|
||||
// evicted and recreated later, we won't be skipping any fragments.
|
||||
// Practically, range tombstones are the only ones that are
|
||||
// problematic to end the buffer on. This is due to the fact range
|
||||
// tombstones can have the same position that multiple following range
|
||||
// tombstones, or a single following clustering row in the stream has.
|
||||
// When a range tombstone is the last in the buffer, we have to continue
|
||||
// to read until we are sure we've read all fragments sharing the same
|
||||
// position, so that we can safely continue reading from after said
|
||||
// position.
|
||||
return do_until(stop, [this, &reader, &buffer, timeout] {
|
||||
if (reader.is_buffer_empty()) {
|
||||
return do_fill_buffer(reader, timeout);
|
||||
}
|
||||
buffer.emplace_back(reader.pop_mutation_fragment());
|
||||
return make_ready_future<>();
|
||||
});
|
||||
}).then([this, &reader, &buffer] {
|
||||
update_next_position(reader, buffer);
|
||||
});
|
||||
}
|
||||
|
||||
shard_reader::remote_reader::remote_reader(
|
||||
schema_ptr schema,
|
||||
reader_lifecycle_policy& lifecycle_policy,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: _schema(std::move(schema))
|
||||
, _lifecycle_policy(lifecycle_policy)
|
||||
, _pr(&pr)
|
||||
, _ps(ps)
|
||||
, _pc(pc)
|
||||
, _trace_state(std::move(trace_state))
|
||||
, _fwd_mr(fwd_mr)
|
||||
, _tri_cmp(*_schema) {
|
||||
}
|
||||
|
||||
future<shard_reader::fill_buffer_result> shard_reader::remote_reader::fill_buffer(const dht::partition_range& pr, bool pending_next_partition,
|
||||
db::timeout_clock::time_point timeout) {
|
||||
// We could have missed a `fast_forward_to()` if the reader wasn't created yet.
|
||||
_pr = ≺
|
||||
if (pending_next_partition) {
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
}
|
||||
return do_with(resume_or_create_reader(), circular_buffer<mutation_fragment>{},
|
||||
[this, pending_next_partition, timeout] (flat_mutation_reader& reader, circular_buffer<mutation_fragment>& buffer) mutable {
|
||||
if (pending_next_partition) {
|
||||
reader.next_partition();
|
||||
}
|
||||
|
||||
return fill_buffer(reader, buffer, timeout).then([this, &reader, &buffer] {
|
||||
const auto eos = reader.is_end_of_stream() && reader.is_buffer_empty();
|
||||
_irh = _lifecycle_policy.pause(std::move(reader));
|
||||
return fill_buffer_result(std::move(buffer), eos);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
future<> shard_reader::remote_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
|
||||
_pr = ≺
|
||||
_last_pkey.reset();
|
||||
_next_position_in_partition = position_in_partition::for_partition_start();
|
||||
|
||||
if (!_reader_created || !_irh) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
if (auto reader_opt = _lifecycle_policy.try_resume(std::move(_irh))) {
|
||||
auto f = reader_opt->fast_forward_to(pr, timeout);
|
||||
return f.then([this, reader = std::move(*reader_opt)] () mutable {
|
||||
_irh = _lifecycle_policy.pause(std::move(reader));
|
||||
});
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
|
||||
|
||||
future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
auto fill_buf_fut = make_ready_future<fill_buffer_result>();
|
||||
const auto pending_next_partition = std::exchange(_pending_next_partition, false);
|
||||
|
||||
struct reader_and_buffer_fill_result {
|
||||
foreign_ptr<std::unique_ptr<remote_reader>> reader;
|
||||
foreign_ptr<std::unique_ptr<evictable_reader>> reader;
|
||||
fill_buffer_result result;
|
||||
};
|
||||
|
||||
if (!_reader) {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), pending_next_partition, timeout] {
|
||||
auto rreader = make_foreign(std::make_unique<remote_reader>(gs.get(), *_lifecycle_policy, *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
auto f = rreader->fill_buffer(*_pr, pending_next_partition, timeout);
|
||||
return f.then([rreader = std::move(rreader)] (fill_buffer_result res) mutable {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, gs = global_schema_ptr(_schema), timeout] {
|
||||
auto ms = mutation_source([lifecycle_policy = _lifecycle_policy.get()] (
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr ts,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return lifecycle_policy->create_reader(std::move(s), pr, ps, pc, std::move(ts), fwd_mr);
|
||||
});
|
||||
auto rreader = make_foreign(std::make_unique<evictable_reader>(evictable_reader::auto_pause::yes, std::move(ms),
|
||||
gs.get(), _lifecycle_policy->semaphore().make_permit(), *_pr, _ps, _pc, _trace_state, _fwd_mr));
|
||||
auto f = rreader->fill_buffer(timeout);
|
||||
return f.then([rreader = std::move(rreader)] () mutable {
|
||||
auto res = fill_buffer_result(rreader->detach_buffer(), rreader->is_end_of_stream());
|
||||
return make_ready_future<reader_and_buffer_fill_result>(reader_and_buffer_fill_result{std::move(rreader), std::move(res)});
|
||||
});
|
||||
}).then([this, timeout] (reader_and_buffer_fill_result res) {
|
||||
@@ -1400,7 +1544,12 @@ future<> shard_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
});
|
||||
} else {
|
||||
fill_buf_fut = smp::submit_to(_shard, [this, pending_next_partition, timeout] () mutable {
|
||||
return _reader->fill_buffer(*_pr, pending_next_partition, timeout);
|
||||
if (pending_next_partition) {
|
||||
_reader->next_partition();
|
||||
}
|
||||
return _reader->fill_buffer(timeout).then([this] {
|
||||
return fill_buffer_result(_reader->detach_buffer(), _reader->is_end_of_stream());
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1649,27 +1798,9 @@ future<> multishard_combining_reader::fast_forward_to(position_range pr, db::tim
|
||||
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class inactive_shard_read : public reader_concurrency_semaphore::inactive_read {
|
||||
flat_mutation_reader_opt _reader;
|
||||
public:
|
||||
inactive_shard_read(flat_mutation_reader reader)
|
||||
: _reader(std::move(reader)) {
|
||||
}
|
||||
flat_mutation_reader reader() && {
|
||||
return std::move(*_reader);
|
||||
}
|
||||
virtual void evict() override {
|
||||
_reader = {};
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
reader_concurrency_semaphore::inactive_read_handle
|
||||
reader_lifecycle_policy::pause(reader_concurrency_semaphore& sem, flat_mutation_reader reader) {
|
||||
return sem.register_inactive_read(std::make_unique<inactive_shard_read>(std::move(reader)));
|
||||
return sem.register_inactive_read(std::make_unique<inactive_evictable_reader>(std::move(reader)));
|
||||
}
|
||||
|
||||
flat_mutation_reader_opt
|
||||
@@ -1678,7 +1809,7 @@ reader_lifecycle_policy::try_resume(reader_concurrency_semaphore& sem, reader_co
|
||||
if (!ir_ptr) {
|
||||
return {};
|
||||
}
|
||||
auto& ir = static_cast<inactive_shard_read&>(*ir_ptr);
|
||||
auto& ir = static_cast<inactive_evictable_reader&>(*ir_ptr);
|
||||
return std::move(ir).reader();
|
||||
}
|
||||
|
||||
|
||||
@@ -372,6 +372,64 @@ flat_mutation_reader make_foreign_reader(schema_ptr schema,
|
||||
foreign_ptr<std::unique_ptr<flat_mutation_reader>> reader,
|
||||
streamed_mutation::forwarding fwd_sm = streamed_mutation::forwarding::no);
|
||||
|
||||
/// Make an auto-paused evictable reader.
|
||||
///
|
||||
/// The reader is paused after each use, that is after each call to any of its
|
||||
/// members that cause actual reading to be done (`fill_buffer()` and
|
||||
/// `fast_forward_to()`). When paused, the reader is made evictable, that it is
|
||||
/// it is registered with reader concurrency semaphore as an inactive read.
|
||||
/// The reader is resumed automatically on the next use. If it was evicted, it
|
||||
/// will be recreated at the position it left off reading. This is all
|
||||
/// transparent to its user.
|
||||
/// Parameters passed by reference have to be kept alive while the reader is
|
||||
/// alive.
|
||||
flat_mutation_reader make_auto_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
class evictable_reader;
|
||||
|
||||
class evictable_reader_handle {
|
||||
friend std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(mutation_source, schema_ptr, reader_permit,
|
||||
const dht::partition_range&, const query::partition_slice&, const io_priority_class&, tracing::trace_state_ptr, mutation_reader::forwarding);
|
||||
|
||||
private:
|
||||
evictable_reader* _r;
|
||||
|
||||
private:
|
||||
explicit evictable_reader_handle(evictable_reader& r);
|
||||
|
||||
public:
|
||||
void pause();
|
||||
};
|
||||
|
||||
/// Make a manually-paused evictable reader.
|
||||
///
|
||||
/// The reader can be paused via the evictable reader handle when desired. The
|
||||
/// intended usage is subsequent reads done in bursts, after which the reader is
|
||||
/// not used for some time. When paused, the reader is made evictable, that is,
|
||||
/// it is registered with reader concurrency semaphore as an inactive read.
|
||||
/// The reader is resumed automatically on the next use. If it was evicted, it
|
||||
/// will be recreated at the position it left off reading. This is all
|
||||
/// transparent to its user.
|
||||
/// Parameters passed by reference have to be kept alive while the reader is
|
||||
/// alive.
|
||||
std::pair<flat_mutation_reader, evictable_reader_handle> make_manually_paused_evictable_reader(
|
||||
mutation_source ms,
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr);
|
||||
|
||||
/// Reader lifecycle policy for the mulitshard combining reader.
|
||||
///
|
||||
/// This policy is expected to make sure any additional resource the readers
|
||||
|
||||
@@ -372,6 +372,7 @@ private:
|
||||
std::optional<utils::phased_barrier::operation> _local_read_op;
|
||||
// Local reader or multishard reader to read the range
|
||||
flat_mutation_reader _reader;
|
||||
std::optional<evictable_reader_handle> _reader_handle;
|
||||
// Current partition read from disk
|
||||
lw_shared_ptr<const decorated_key_with_hash> _current_dk;
|
||||
|
||||
@@ -390,32 +391,49 @@ public:
|
||||
, _sharder(remote_sharder, range, remote_shard)
|
||||
, _seed(seed)
|
||||
, _local_read_op(local_reader ? std::optional(cf.read_in_progress()) : std::nullopt)
|
||||
, _reader(make_reader(db, cf, local_reader)) {
|
||||
}
|
||||
|
||||
private:
|
||||
flat_mutation_reader
|
||||
make_reader(seastar::sharded<database>& db,
|
||||
column_family& cf,
|
||||
is_local_reader local_reader) {
|
||||
, _reader(nullptr) {
|
||||
if (local_reader) {
|
||||
return cf.make_streaming_reader(_schema, _range);
|
||||
auto ms = mutation_source([&cf] (
|
||||
schema_ptr s,
|
||||
reader_permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr,
|
||||
streamed_mutation::forwarding,
|
||||
mutation_reader::forwarding fwd_mr) {
|
||||
return cf.make_streaming_reader(std::move(s), pr, ps, fwd_mr);
|
||||
});
|
||||
std::tie(_reader, _reader_handle) = make_manually_paused_evictable_reader(
|
||||
std::move(ms),
|
||||
_schema,
|
||||
cf.streaming_read_concurrency_semaphore().make_permit(),
|
||||
_range,
|
||||
_schema->full_slice(),
|
||||
service::get_local_streaming_priority(),
|
||||
{},
|
||||
mutation_reader::forwarding::no);
|
||||
} else {
|
||||
_reader = make_multishard_streaming_reader(db, _schema, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
if (shard_range) {
|
||||
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
|
||||
}
|
||||
return std::optional<dht::partition_range>();
|
||||
});
|
||||
}
|
||||
return make_multishard_streaming_reader(db, _schema, [this] {
|
||||
auto shard_range = _sharder.next();
|
||||
if (shard_range) {
|
||||
return std::optional<dht::partition_range>(dht::to_partition_range(*shard_range));
|
||||
}
|
||||
return std::optional<dht::partition_range>();
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
future<mutation_fragment_opt>
|
||||
read_mutation_fragment() {
|
||||
return _reader(db::no_timeout);
|
||||
}
|
||||
|
||||
void on_end_of_stream() {
|
||||
_reader = make_empty_flat_reader(_schema);
|
||||
_reader_handle.reset();
|
||||
}
|
||||
|
||||
lw_shared_ptr<const decorated_key_with_hash>& get_current_dk() {
|
||||
return _current_dk;
|
||||
}
|
||||
@@ -434,6 +452,11 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void pause() {
|
||||
if (_reader_handle) {
|
||||
_reader_handle->pause();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class repair_writer {
|
||||
@@ -1015,11 +1038,7 @@ private:
|
||||
return repair_hash(h.finalize_uint64());
|
||||
}
|
||||
|
||||
stop_iteration handle_mutation_fragment(mutation_fragment_opt mfopt, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
if (!mfopt) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
mutation_fragment& mf = *mfopt;
|
||||
stop_iteration handle_mutation_fragment(mutation_fragment& mf, size_t& cur_size, size_t& new_rows_size, std::list<repair_row>& cur_rows) {
|
||||
if (mf.is_partition_start()) {
|
||||
auto& start = mf.as_partition_start();
|
||||
_repair_reader.set_current_dk(start.key());
|
||||
@@ -1054,9 +1073,18 @@ private:
|
||||
}
|
||||
_gate.check();
|
||||
return _repair_reader.read_mutation_fragment().then([this, &cur_size, &new_rows_size, &cur_rows] (mutation_fragment_opt mfopt) mutable {
|
||||
return handle_mutation_fragment(std::move(mfopt), cur_size, new_rows_size, cur_rows);
|
||||
if (!mfopt) {
|
||||
_repair_reader.on_end_of_stream();
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
return handle_mutation_fragment(*mfopt, cur_size, new_rows_size, cur_rows);
|
||||
});
|
||||
}).then([&cur_rows, &new_rows_size] () mutable {
|
||||
}).then_wrapped([this, &cur_rows, &new_rows_size] (future<> fut) mutable {
|
||||
if (fut.failed()) {
|
||||
_repair_reader.on_end_of_stream();
|
||||
return make_exception_future<std::list<repair_row>, size_t>(fut.get_exception());
|
||||
}
|
||||
_repair_reader.pause();
|
||||
return make_ready_future<std::list<repair_row>, size_t>(std::move(cur_rows), new_rows_size);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,6 +42,7 @@
|
||||
#include "test/lib/dummy_sharder.hh"
|
||||
#include "test/lib/reader_lifecycle_policy.hh"
|
||||
#include "test/lib/reader_permit.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
|
||||
#include "dht/sharder.hh"
|
||||
#include "mutation_reader.hh"
|
||||
@@ -2742,3 +2743,110 @@ SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
|
||||
}
|
||||
reader_assertions.produces_end_of_stream();
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_auto_paused_evictable_reader_is_mutation_source) {
|
||||
auto make_populate = [] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point query_time) {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto& mut : mutations) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
return mutation_source([=] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) mutable {
|
||||
auto mr = make_auto_paused_evictable_reader(mt->as_data_source(), std::move(s), permit, range, slice, pc, std::move(trace_state), fwd_mr);
|
||||
if (fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
return make_forwardable(std::move(mr));
|
||||
}
|
||||
return mr;
|
||||
});
|
||||
};
|
||||
|
||||
run_mutation_source_tests(make_populate);
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) {
|
||||
class maybe_pausing_reader : public flat_mutation_reader::impl {
|
||||
flat_mutation_reader _reader;
|
||||
std::optional<evictable_reader_handle> _handle;
|
||||
|
||||
private:
|
||||
void maybe_pause() {
|
||||
if (!tests::random::get_int(0, 4)) {
|
||||
_handle->pause();
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
maybe_pausing_reader(
|
||||
memtable& mt,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& pr,
|
||||
const query::partition_slice& ps,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
mutation_reader::forwarding fwd_mr)
|
||||
: impl(mt.schema()), _reader(nullptr) {
|
||||
std::tie(_reader, _handle) = make_manually_paused_evictable_reader(mt.as_data_source(), mt.schema(), permit, pr, ps, pc,
|
||||
std::move(trace_state), fwd_mr);
|
||||
}
|
||||
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
|
||||
return _reader.fill_buffer(timeout).then([this] {
|
||||
_end_of_stream = _reader.is_end_of_stream();
|
||||
_reader.move_buffer_content_to(*this);
|
||||
}).then([this] {
|
||||
maybe_pause();
|
||||
});
|
||||
}
|
||||
virtual void next_partition() override {
|
||||
clear_buffer_to_next_partition();
|
||||
if (!is_buffer_empty()) {
|
||||
return;
|
||||
}
|
||||
_end_of_stream = false;
|
||||
_reader.next_partition();
|
||||
}
|
||||
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
|
||||
clear_buffer();
|
||||
_end_of_stream = false;
|
||||
return _reader.fast_forward_to(pr, timeout).then([this] {
|
||||
maybe_pause();
|
||||
});
|
||||
}
|
||||
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
|
||||
throw_with_backtrace<std::bad_function_call>();
|
||||
}
|
||||
virtual size_t buffer_size() const override {
|
||||
return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size();
|
||||
}
|
||||
};
|
||||
|
||||
auto make_populate = [] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point query_time) {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
for (auto& mut : mutations) {
|
||||
mt->apply(mut);
|
||||
}
|
||||
return mutation_source([=] (
|
||||
schema_ptr s,
|
||||
reader_permit permit,
|
||||
const dht::partition_range& range,
|
||||
const query::partition_slice& slice,
|
||||
const io_priority_class& pc,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
streamed_mutation::forwarding fwd_sm,
|
||||
mutation_reader::forwarding fwd_mr) mutable {
|
||||
auto mr = make_flat_mutation_reader<maybe_pausing_reader>(*mt, std::move(permit), range, slice, pc, std::move(trace_state), fwd_mr);
|
||||
if (fwd_sm == streamed_mutation::forwarding::yes) {
|
||||
return make_forwardable(std::move(mr));
|
||||
}
|
||||
return mr;
|
||||
});
|
||||
};
|
||||
|
||||
run_mutation_source_tests(make_populate);
|
||||
}
|
||||
|
||||
@@ -91,6 +91,7 @@ private:
|
||||
std::optional<const dht::partition_range> range;
|
||||
std::optional<const query::partition_slice> slice;
|
||||
|
||||
reader_context() = default;
|
||||
reader_context(dht::partition_range range, query::partition_slice slice) : range(std::move(range)), slice(std::move(slice)) {
|
||||
}
|
||||
};
|
||||
@@ -143,18 +144,21 @@ public:
|
||||
}
|
||||
virtual reader_concurrency_semaphore& semaphore() override {
|
||||
const auto shard = this_shard_id();
|
||||
if (!_contexts[shard]->semaphore) {
|
||||
if (_evict_paused_readers) {
|
||||
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
|
||||
format("reader_concurrency_semaphore @shard_id={}", shard));
|
||||
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit();
|
||||
// Add a waiter, so that all registered inactive reads are
|
||||
// immediately evicted.
|
||||
// We don't care about the returned future.
|
||||
_contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout);
|
||||
} else {
|
||||
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
}
|
||||
if (!_contexts[shard]) {
|
||||
_contexts[shard] = make_foreign(std::make_unique<reader_context>());
|
||||
} else if (_contexts[shard]->semaphore) {
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
if (_evict_paused_readers) {
|
||||
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(0, std::numeric_limits<ssize_t>::max(),
|
||||
format("reader_concurrency_semaphore @shard_id={}", shard));
|
||||
_contexts[shard]->permit = _contexts[shard]->semaphore->make_permit();
|
||||
// Add a waiter, so that all registered inactive reads are
|
||||
// immediately evicted.
|
||||
// We don't care about the returned future.
|
||||
_contexts[shard]->wait_future = _contexts[shard]->permit->wait_admission(1, db::no_timeout);
|
||||
} else {
|
||||
_contexts[shard]->semaphore = std::make_unique<reader_concurrency_semaphore>(reader_concurrency_semaphore::no_limits{});
|
||||
}
|
||||
return *_contexts[shard]->semaphore;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user