From e545b382bdd5aeda313ed2d42e8c2d6f6595f640 Mon Sep 17 00:00:00 2001 From: Calle Wilund Date: Mon, 28 Jun 2021 11:31:48 +0000 Subject: [PATCH] commitlog: coroutinize segment::allocate --- db/commitlog/commitlog.cc | 221 +++++++++++++++++++------------------- 1 file changed, 109 insertions(+), 112 deletions(-) diff --git a/db/commitlog/commitlog.cc b/db/commitlog/commitlog.cc index d0b01212d4..692d1f9879 100644 --- a/db/commitlog/commitlog.cc +++ b/db/commitlog/commitlog.cc @@ -349,14 +349,13 @@ public: return ++_ids; } - std::exception_ptr sanity_check_size(size_t size) { + void sanity_check_size(size_t size) { if (size > max_mutation_size) { - return make_exception_ptr(std::invalid_argument( + throw std::invalid_argument( "Mutation of " + std::to_string(size) + " bytes is too large for the maximum size of " - + std::to_string(max_mutation_size))); + + std::to_string(max_mutation_size)); } - return nullptr; } future<> init(); @@ -924,133 +923,134 @@ public: }); } + void background_cycle() { + //FIXME: discarded future + (void)cycle().discard_result().handle_exception([] (auto ex) { + clogger.error("Failed to flush commits to disk: {}", ex); + }); + } + /** * Add a "mutation" to the segment. + * Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not + * die. We don't keep ourselves alive (anymore) */ future<> allocate(shared_ptr writer, segment_manager::request_controller_units permit, db::timeout_clock::time_point timeout) { - if (must_sync()) { - return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto s) mutable { - return s->allocate(std::move(writer), std::move(permit), timeout); - }); - } - - const auto size = writer->size(*this); - const auto s = size + writer->num_entries * entry_overhead_size + (writer->num_entries > 1 ? multi_entry_overhead_size : 0u); // total size - auto ep = _segment_manager->sanity_check_size(s); - if (ep) { - return make_exception_future<>(std::move(ep)); - } - - if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big? - return finish_and_get_new(timeout).then([writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable { - return new_seg->allocate(std::move(writer), std::move(permit), timeout); - }); - } else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data? - if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) { - // TODO: this could cause starvation if we're really unlucky. - // If we run batch mode and find ourselves not fit in a non-empty - // buffer, we must force a cycle and wait for it (to keep flush order) - // This will most likely cause parallel writes, and consecutive flushes. - return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable { - return new_seg->allocate(std::move(writer), std::move(permit), timeout); - }); - } else { - //FIXME: discarded future - (void)cycle().discard_result().handle_exception([] (auto ex) { - clogger.error("Failed to flush commits to disk: {}", ex); - }); + for (;;) { + if (must_sync()) { + co_await with_timeout(timeout, sync()); + continue; } - } - size_t buf_memory = s; - if (_buffer.empty()) { - new_buffer(s); - buf_memory += buffer_position(); - } + const auto size = writer->size(*this); + const auto s = size + writer->num_entries * entry_overhead_size + (writer->num_entries > 1 ? multi_entry_overhead_size : 0u); // total size - if (_closed) { - return make_exception_future<>(std::runtime_error("commitlog: Cannot add data to a closed segment")); - } + _segment_manager->sanity_check_size(s); - buf_memory -= permit.release(); - _segment_manager->account_memory_usage(buf_memory); + if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big? + auto new_seg = co_await finish_and_get_new(timeout); + co_await new_seg->allocate(std::move(writer), std::move(permit), timeout); + break; + } else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data? + if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) { + // TODO: this could cause starvation if we're really unlucky. + // If we run batch mode and find ourselves not fit in a non-empty + // buffer, we must force a cycle and wait for it (to keep flush order) + // This will most likely cause parallel writes, and consecutive flushes. + co_await with_timeout(timeout, sync()); + continue; + } else { + background_cycle(); + } + } - auto& out = _buffer_ostream; + size_t buf_memory = s; + if (_buffer.empty()) { + new_buffer(s); + buf_memory += buffer_position(); + } - std::optional mecrc; + if (_closed) { + throw std::runtime_error("commitlog: Cannot add data to a closed segment"); + } - // if this is multi-entry write, we need to add an extra header + crc - // the header and crc formula is: - // header: - // magic : uint32_t - // size : uint32_t - // crc1 : uint32_t - crc of magic, size - // -> entries[] - // post: - // crc2 : uint32_t - crc1 + each entry crc. - if (writer->num_entries > 1) { - mecrc.emplace(); - write(out, multi_entry_size_magic); - write(out, s); - mecrc->process(multi_entry_size_magic); - mecrc->process(uint32_t(s)); - write(out, mecrc->checksum()); - } + buf_memory -= permit.release(); + _segment_manager->account_memory_usage(buf_memory); - for (size_t entry = 0; entry < writer->num_entries; ++entry) { - replay_position rp(_desc.id, position()); - auto id = writer->id(entry); - auto entry_size = writer->num_entries == 1 ? size : writer->size(*this, entry); - auto es = entry_size + entry_overhead_size; + auto& out = _buffer_ostream; - _cf_dirty[id]++; // increase use count for cf. + std::optional mecrc; - rp_handle h(static_pointer_cast(shared_from_this()), std::move(id), rp); + // if this is multi-entry write, we need to add an extra header + crc + // the header and crc formula is: + // header: + // magic : uint32_t + // size : uint32_t + // crc1 : uint32_t - crc of magic, size + // -> entries[] + // post: + // crc2 : uint32_t - crc1 + each entry crc. + if (writer->num_entries > 1) { + mecrc.emplace(); + write(out, multi_entry_size_magic); + write(out, s); + mecrc->process(multi_entry_size_magic); + mecrc->process(uint32_t(s)); + write(out, mecrc->checksum()); + } - crc32_nbo crc; + for (size_t entry = 0; entry < writer->num_entries; ++entry) { + replay_position rp(_desc.id, position()); + auto id = writer->id(entry); + auto entry_size = writer->num_entries == 1 ? size : writer->size(*this, entry); + auto es = entry_size + entry_overhead_size; - write(out, es); - crc.process(uint32_t(es)); - write(out, crc.checksum()); + _cf_dirty[id]++; // increase use count for cf. - // actual data - auto entry_out = out.write_substream(entry_size); - auto entry_data = entry_out.to_input_stream(); - writer->write(*this, entry_out, entry); - entry_data.with_stream([&] (auto data_str) { - crc.process_fragmented(ser::buffer_view>::iterator>(data_str)); - }); + rp_handle h(static_pointer_cast(shared_from_this()), std::move(id), rp); + + crc32_nbo crc; + + write(out, es); + crc.process(uint32_t(es)); + write(out, crc.checksum()); + + // actual data + auto entry_out = out.write_substream(entry_size); + auto entry_data = entry_out.to_input_stream(); + writer->write(*this, entry_out, entry); + entry_data.with_stream([&] (auto data_str) { + crc.process_fragmented(ser::buffer_view>::iterator>(data_str)); + }); + + auto checksum = crc.checksum(); + write(out, checksum); + if (mecrc) { + mecrc->process(checksum); + } + + writer->result(entry, std::move(h)); + } - auto checksum = crc.checksum(); - write(out, checksum); if (mecrc) { - mecrc->process(checksum); + // write the crc of header + all sub-entry crc + write(out, mecrc->checksum()); } - writer->result(entry, std::move(h)); - } + ++_segment_manager->totals.allocation_count; + ++_num_allocs; - if (mecrc) { - // write the crc of header + all sub-entry crc - write(out, mecrc->checksum()); - } - - ++_segment_manager->totals.allocation_count; - ++_num_allocs; - - if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) { - return batch_cycle(timeout).discard_result(); - } else { - // If this buffer alone is too big, potentially bigger than the maximum allowed size, - // then no other request will be allowed in to force the cycle()ing of this buffer. We - // have to do it ourselves. - if ((buffer_position() >= (db::commitlog::segment::default_size))) { - //FIXME: discarded future. - (void)cycle().discard_result().handle_exception([] (auto ex) { - clogger.error("Failed to flush commits to disk: {}", ex); - }); + if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) { + co_await batch_cycle(timeout).discard_result(); + } else { + // If this buffer alone is too big, potentially bigger than the maximum allowed size, + // then no other request will be allowed in to force the cycle()ing of this buffer. We + // have to do it ourselves. + if ((buffer_position() >= (db::commitlog::segment::default_size))) { + background_cycle(); + } } - return make_ready_future<>(); + break; } } @@ -1118,10 +1118,7 @@ db::commitlog::segment_manager::allocate_when_possible(shared_ptr // If this is already too big now, we should throw early. It's also a correctness issue, since // if we are too big at this moment we'll never reach allocate() to actually throw at that // point. - auto ep = sanity_check_size(size); - if (ep) { - return make_exception_future<>(std::move(ep)); - } + sanity_check_size(size); auto fut = get_units(_request_controller, size, timeout); if (_request_controller.waiters()) {