From 4ecf061c9012c74da60c4e1ea710cccd7a6dc807 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 27 May 2021 13:39:08 +0300 Subject: [PATCH] reader_lifecycle_policy implementations: fix indentation Left broken from the previous patch. --- database.cc | 8 +-- multishard_mutation_query.cc | 92 ++++++++++++++--------------- test/lib/reader_lifecycle_policy.hh | 28 ++++----- 3 files changed, 64 insertions(+), 64 deletions(-) diff --git a/database.cc b/database.cc index 1a3004539d..f621d86e2d 100644 --- a/database.cc +++ b/database.cc @@ -2309,10 +2309,10 @@ flat_mutation_reader make_multishard_streaming_reader(distributed& db, return cf.make_streaming_reader(std::move(schema), *_contexts[shard].range, slice, fwd_mr); } virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { - return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { - auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); - return reader_opt ? reader_opt->close() : make_ready_future<>(); - }); + return smp::submit_to(shard, [ctx = std::move(_contexts[shard]), handle = std::move(reader.handle)] () mutable { + auto reader_opt = ctx.semaphore->unregister_inactive_read(std::move(*handle)); + return reader_opt ? reader_opt->close() : make_ready_future<>(); + }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id(); diff --git a/multishard_mutation_query.cc b/multishard_mutation_query.cc index f5dbf63a68..fa377d1e00 100644 --- a/multishard_mutation_query.cc +++ b/multishard_mutation_query.cc @@ -322,39 +322,39 @@ flat_mutation_reader read_context::create_reader( } future<> read_context::destroy_reader(shard_id shard, stopped_reader reader) noexcept { - auto& rm = _readers[shard]; + auto& rm = _readers[shard]; - if (rm.state == reader_state::used) { - rm.state = reader_state::saving; - rm.handle = std::move(reader.handle); - rm.buffer = std::move(reader.unconsumed_fragments); - } else { - mmq_log.warn( - "Unexpected request to dismantle reader in state `{}` for shard {}." - " Reader was not created nor is in the process of being created.", - reader_state_to_string(rm.state), - shard); - } + if (rm.state == reader_state::used) { + rm.state = reader_state::saving; + rm.handle = std::move(reader.handle); + rm.buffer = std::move(reader.unconsumed_fragments); + } else { + mmq_log.warn( + "Unexpected request to dismantle reader in state `{}` for shard {}." + " Reader was not created nor is in the process of being created.", + reader_state_to_string(rm.state), + shard); + } return make_ready_future<>(); } future<> read_context::stop() { - return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { - if (_readers[shard].rparts) { - return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { - auto rparts = rm.rparts.release(); - auto irh = rm.handle.release(); - if (*irh) { - auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); - if (reader_opt) { - return reader_opt->close().then([rparts = std::move(rparts)] { }); - } + return parallel_for_each(smp::all_cpus(), [this] (unsigned shard) { + if (_readers[shard].rparts) { + return _db.invoke_on(shard, [rm = std::move(_readers[shard])] (database& db) mutable { + auto rparts = rm.rparts.release(); + auto irh = rm.handle.release(); + if (*irh) { + auto reader_opt = rparts->permit.semaphore().unregister_inactive_read(std::move(*rm.handle)); + if (reader_opt) { + return reader_opt->close().then([rparts = std::move(rparts)] { }); } - return make_ready_future<>(); - }); - } - return make_ready_future<>(); - }); + } + return make_ready_future<>(); + }); + } + return make_ready_future<>(); + }); } read_context::dismantle_buffer_stats read_context::dismantle_combined_buffer(flat_mutation_reader::tracked_buffer combined_buffer, @@ -546,32 +546,32 @@ future<> read_context::save_readers(flat_mutation_reader::tracked_buffer unconsu return make_ready_future<>(); } - auto last_pkey = compaction_state.partition_start.key(); + auto last_pkey = compaction_state.partition_start.key(); - // Ensure all readers have engaged reader_meta::buffer member. - for (auto& rm : _readers) { - if (!rm.buffer) { - rm.buffer.emplace(_permit); - } + // Ensure all readers have engaged reader_meta::buffer member. + for (auto& rm : _readers) { + if (!rm.buffer) { + rm.buffer.emplace(_permit); } + } - const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); - tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats); + const auto cb_stats = dismantle_combined_buffer(std::move(unconsumed_buffer), last_pkey); + tracing::trace(_trace_state, "Dismantled combined buffer: {}", cb_stats); - const auto cs_stats = dismantle_compaction_state(std::move(compaction_state)); - tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats); + const auto cs_stats = dismantle_compaction_state(std::move(compaction_state)); + tracing::trace(_trace_state, "Dismantled compaction state: {}", cs_stats); - return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey, - const std::optional& last_ckey) { - return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) { - auto& rm = _readers[shard]; - if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) { - return save_reader(shard, last_pkey, last_ckey); - } + return do_with(std::move(last_pkey), std::move(last_ckey), [this] (const dht::decorated_key& last_pkey, + const std::optional& last_ckey) { + return parallel_for_each(boost::irange(0u, smp::count), [this, &last_pkey, &last_ckey] (shard_id shard) { + auto& rm = _readers[shard]; + if (rm.state == reader_state::successful_lookup || rm.state == reader_state::saving) { + return save_reader(shard, last_pkey, last_ckey); + } - return make_ready_future<>(); - }); + return make_ready_future<>(); }); + }); } namespace { diff --git a/test/lib/reader_lifecycle_policy.hh b/test/lib/reader_lifecycle_policy.hh index 2dfb54a03b..5943ef70de 100644 --- a/test/lib/reader_lifecycle_policy.hh +++ b/test/lib/reader_lifecycle_policy.hh @@ -145,20 +145,20 @@ public: } virtual future<> destroy_reader(shard_id shard, stopped_reader reader) noexcept override { // waited via _operation_gate - return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { - auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); - auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); - ctx->semaphore->broken(); - if (ctx->wait_future) { - ret = ret.then([ctx = std::move(ctx)] () mutable { - return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { - f.ignore_ready_future(); - ctx->permit.reset(); // make sure it's destroyed before the semaphore - }); - }); - } - return std::move(ret); - }); + return smp::submit_to(shard, [handle = std::move(reader.handle), ctx = &*_contexts[shard]] () mutable { + auto reader_opt = ctx->semaphore->unregister_inactive_read(std::move(*handle)); + auto ret = reader_opt ? reader_opt->close() : make_ready_future<>(); + ctx->semaphore->broken(); + if (ctx->wait_future) { + ret = ret.then([ctx = std::move(ctx)] () mutable { + return ctx->wait_future->then_wrapped([ctx = std::move(ctx)] (future f) mutable { + f.ignore_ready_future(); + ctx->permit.reset(); // make sure it's destroyed before the semaphore + }); + }); + } + return std::move(ret); + }); } virtual reader_concurrency_semaphore& semaphore() override { const auto shard = this_shard_id();