reader_lifecycle_policy implementations: fix indentation

Left broken from the previous patch.
This commit is contained in:
Botond Dénes
2021-05-27 13:39:08 +03:00
parent a7e59d3e2c
commit 4ecf061c90
3 changed files with 64 additions and 64 deletions

View File

@@ -2309,10 +2309,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed<database>& db,
return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr);
}
virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override {
return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable {
auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle));
return reader_opt ? reader_opt->close() : make_ready_future<>();
});
return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable {
auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle));
return reader_opt ? reader_opt->close() : make_ready_future<>();
});
}
virtual reader_concurrency_semaphore& semaphore() override {
const auto shard = this_shard_id();

View File

@@ -322,39 +322,39 @@ flat_mutation_reader read_context::create_reader(
}
future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noexcept {
auto& rm = _readers[shard];
auto& rm = _readers[shard];
if (rm.state == reader_state::used) {
rm.state = reader_state::saving;
rm.handle = std::move(reader.handle);
rm.buffer = std::move(reader.unconsumed_fragments);
} else {
mmq_log.warn(
"Unexpected request to dismantle reader in state `{}` for shard {}."
" Reader was not created nor is in the process of being created.",
reader_state_to_string(rm.state),
shard);
}
if (rm.state == reader_state::used) {
rm.state = reader_state::saving;
rm.handle = std::move(reader.handle);
rm.buffer = std::move(reader.unconsumed_fragments);
} else {
mmq_log.warn(
"Unexpected request to dismantle reader in state `{}` for shard {}."
" Reader was not created nor is in the process of being created.",
reader_state_to_string(rm.state),
shard);
}
return make_ready_future<>();
}
future<> read_context::stop() {
return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) {
if (_readers[shard].rparts) {
return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable {
auto rparts = rm.rparts.release();
auto irh = rm.handle.release();
if (*irh) {
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle));
if (reader_opt) {
return reader_opt->close().then([rparts = std::move(rparts)] { });
}
return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) {
if (_readers[shard].rparts) {
return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable {
auto rparts = rm.rparts.release();
auto irh = rm.handle.release();
if (*irh) {
auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle));
if (reader_opt) {
return reader_opt->close().then([rparts = std::move(rparts)] { });
}
return make_ready_future<>();
});
}
return make_ready_future<>();
});
}
return make_ready_future<>();
});
}
return make_ready_future<>();
});
}
read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer,
@@ -546,32 +546,32 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu
return make_ready_future<>();
}
auto last_pkey = compaction_state.partition_start.key();
auto last_pkey = compaction_state.partition_start.key();
// Ensure all readers have engaged reader_meta::buffer member.
for (auto& rm : _readers) {
if (!rm.buffer) {
rm.buffer.emplace(_permit);
}
// Ensure all readers have engaged reader_meta::buffer member.
for (auto& rm : _readers) {
if (!rm.buffer) {
rm.buffer.emplace(_permit);
}
}
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey);
tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats);
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
const auto cs_stats = dismantle_compaction_state(std::move(compaction_state));
tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats);
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey) {
return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) {
auto& rm = _readers[shard];
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
return save_reader(shard, last_pkey, last_ckey);
}
return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey,
const std::optional<clustering_key_prefix>& last_ckey) {
return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) {
auto& rm = _readers[shard];
if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) {
return save_reader(shard, last_pkey, last_ckey);
}
return make_ready_future<>();
});
return make_ready_future<>();
});
});
}
namespace {

View File

@@ -145,20 +145,20 @@ public:
}
virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override {
// waited via _operation_gate
return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable {
auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle));
auto ret = reader_opt ? reader_opt->close() : make_ready_future<>();
ctx->semaphore->broken();
if (ctx->wait_future) {
ret = ret.then([ctx = std::move(ctx)] () mutable {
return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future<reader_permit::resource_units> f) mutable {
f.ignore_ready_future();
ctx->permit.reset(); // make sure it's destroyed before the semaphore
});
});
}
return std::move(ret);
});
return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable {
auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle));
auto ret = reader_opt ? reader_opt->close() : make_ready_future<>();
ctx->semaphore->broken();
if (ctx->wait_future) {
ret = ret.then([ctx = std::move(ctx)] () mutable {
return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future<reader_permit::resource_units> f) mutable {
f.ignore_ready_future();
ctx->permit.reset(); // make sure it's destroyed before the semaphore
});
});
}
return std::move(ret);
});
}
virtual reader_concurrency_semaphore& semaphore() override {
const auto shard = this_shard_id();