commitlog: coroutinize segment::allocate

This commit is contained in:
Calle Wilund
2021-06-28 11:31:48 +00:00
parent df822e09e0
commit e545b382bd

View File

@@ -349,14 +349,13 @@ public:
return ++_ids;
}
std::exception_ptr sanity_check_size(size_t size) {
void sanity_check_size(size_t size) {
if (size > max_mutation_size) {
return make_exception_ptr(std::invalid_argument(
throw std::invalid_argument(
"Mutation of " + std::to_string(size)
+ " bytes is too large for the maximum size of "
+ std::to_string(max_mutation_size)));
+ std::to_string(max_mutation_size));
}
return nullptr;
}
future<> init();
@@ -924,133 +923,134 @@ public:
});
}
void background_cycle() {
//FIXME: discarded future
(void)cycle().discard_result().handle_exception([] (auto ex) {
clogger.error("Failed to flush commits to disk: {}", ex);
});
}
/**
* Add a "mutation" to the segment.
* Should only be called from "allocate_when_possible". "this" must be secure in a shared_ptr that will not
* die. We don't keep ourselves alive (anymore)
*/
future<> allocate(shared_ptr<entry_writer> writer, segment_manager::request_controller_units permit, db::timeout_clock::time_point timeout) {
if (must_sync()) {
return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto s) mutable {
return s->allocate(std::move(writer), std::move(permit), timeout);
});
}
const auto size = writer->size(*this);
const auto s = size + writer->num_entries * entry_overhead_size + (writer->num_entries > 1 ? multi_entry_overhead_size : 0u); // total size
auto ep = _segment_manager->sanity_check_size(s);
if (ep) {
return make_exception_future<>(std::move(ep));
}
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
return finish_and_get_new(timeout).then([writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(std::move(writer), std::move(permit), timeout);
});
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
// This will most likely cause parallel writes, and consecutive flushes.
return with_timeout(timeout, sync()).then([this, writer = std::move(writer), permit = std::move(permit), timeout] (auto new_seg) mutable {
return new_seg->allocate(std::move(writer), std::move(permit), timeout);
});
} else {
//FIXME: discarded future
(void)cycle().discard_result().handle_exception([] (auto ex) {
clogger.error("Failed to flush commits to disk: {}", ex);
});
for (;;) {
if (must_sync()) {
co_await with_timeout(timeout, sync());
continue;
}
}
size_t buf_memory = s;
if (_buffer.empty()) {
new_buffer(s);
buf_memory += buffer_position();
}
const auto size = writer->size(*this);
const auto s = size + writer->num_entries * entry_overhead_size + (writer->num_entries > 1 ? multi_entry_overhead_size : 0u); // total size
if (_closed) {
return make_exception_future<>(std::runtime_error("commitlog: Cannot add data to a closed segment"));
}
_segment_manager->sanity_check_size(s);
buf_memory -= permit.release();
_segment_manager->account_memory_usage(buf_memory);
if (!is_still_allocating() || position() + s > _segment_manager->max_size) { // would we make the file too big?
auto new_seg = co_await finish_and_get_new(timeout);
co_await new_seg->allocate(std::move(writer), std::move(permit), timeout);
break;
} else if (!_buffer.empty() && (s > _buffer_ostream.size())) { // enough data?
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
// TODO: this could cause starvation if we're really unlucky.
// If we run batch mode and find ourselves not fit in a non-empty
// buffer, we must force a cycle and wait for it (to keep flush order)
// This will most likely cause parallel writes, and consecutive flushes.
co_await with_timeout(timeout, sync());
continue;
} else {
background_cycle();
}
}
auto& out = _buffer_ostream;
size_t buf_memory = s;
if (_buffer.empty()) {
new_buffer(s);
buf_memory += buffer_position();
}
std::optional<crc32_nbo> mecrc;
if (_closed) {
throw std::runtime_error("commitlog: Cannot add data to a closed segment");
}
// if this is multi-entry write, we need to add an extra header + crc
// the header and crc formula is:
// header:
// magic : uint32_t
// size : uint32_t
// crc1 : uint32_t - crc of magic, size
// -> entries[]
// post:
// crc2 : uint32_t - crc1 + each entry crc.
if (writer->num_entries > 1) {
mecrc.emplace();
write<uint32_t>(out, multi_entry_size_magic);
write<uint32_t>(out, s);
mecrc->process(multi_entry_size_magic);
mecrc->process(uint32_t(s));
write<uint32_t>(out, mecrc->checksum());
}
buf_memory -= permit.release();
_segment_manager->account_memory_usage(buf_memory);
for (size_t entry = 0; entry < writer->num_entries; ++entry) {
replay_position rp(_desc.id, position());
auto id = writer->id(entry);
auto entry_size = writer->num_entries == 1 ? size : writer->size(*this, entry);
auto es = entry_size + entry_overhead_size;
auto& out = _buffer_ostream;
_cf_dirty[id]++; // increase use count for cf.
std::optional<crc32_nbo> mecrc;
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
// if this is multi-entry write, we need to add an extra header + crc
// the header and crc formula is:
// header:
// magic : uint32_t
// size : uint32_t
// crc1 : uint32_t - crc of magic, size
// -> entries[]
// post:
// crc2 : uint32_t - crc1 + each entry crc.
if (writer->num_entries > 1) {
mecrc.emplace();
write<uint32_t>(out, multi_entry_size_magic);
write<uint32_t>(out, s);
mecrc->process(multi_entry_size_magic);
mecrc->process(uint32_t(s));
write<uint32_t>(out, mecrc->checksum());
}
crc32_nbo crc;
for (size_t entry = 0; entry < writer->num_entries; ++entry) {
replay_position rp(_desc.id, position());
auto id = writer->id(entry);
auto entry_size = writer->num_entries == 1 ? size : writer->size(*this, entry);
auto es = entry_size + entry_overhead_size;
write<uint32_t>(out, es);
crc.process(uint32_t(es));
write<uint32_t>(out, crc.checksum());
_cf_dirty[id]++; // increase use count for cf.
// actual data
auto entry_out = out.write_substream(entry_size);
auto entry_data = entry_out.to_input_stream();
writer->write(*this, entry_out, entry);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);
crc32_nbo crc;
write<uint32_t>(out, es);
crc.process(uint32_t(es));
write<uint32_t>(out, crc.checksum());
// actual data
auto entry_out = out.write_substream(entry_size);
auto entry_data = entry_out.to_input_stream();
writer->write(*this, entry_out, entry);
entry_data.with_stream([&] (auto data_str) {
crc.process_fragmented(ser::buffer_view<typename std::vector<temporary_buffer<char>>::iterator>(data_str));
});
auto checksum = crc.checksum();
write<uint32_t>(out, checksum);
if (mecrc) {
mecrc->process(checksum);
}
writer->result(entry, std::move(h));
}
auto checksum = crc.checksum();
write<uint32_t>(out, checksum);
if (mecrc) {
mecrc->process(checksum);
// write the crc of header + all sub-entry crc
write<uint32_t>(out, mecrc->checksum());
}
writer->result(entry, std::move(h));
}
++_segment_manager->totals.allocation_count;
++_num_allocs;
if (mecrc) {
// write the crc of header + all sub-entry crc
write<uint32_t>(out, mecrc->checksum());
}
++_segment_manager->totals.allocation_count;
++_num_allocs;
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
return batch_cycle(timeout).discard_result();
} else {
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
// have to do it ourselves.
if ((buffer_position() >= (db::commitlog::segment::default_size))) {
//FIXME: discarded future.
(void)cycle().discard_result().handle_exception([] (auto ex) {
clogger.error("Failed to flush commits to disk: {}", ex);
});
if (_segment_manager->cfg.mode == sync_mode::BATCH || writer->sync) {
co_await batch_cycle(timeout).discard_result();
} else {
// If this buffer alone is too big, potentially bigger than the maximum allowed size,
// then no other request will be allowed in to force the cycle()ing of this buffer. We
// have to do it ourselves.
if ((buffer_position() >= (db::commitlog::segment::default_size))) {
background_cycle();
}
}
return make_ready_future<>();
break;
}
}
@@ -1118,10 +1118,7 @@ db::commitlog::segment_manager::allocate_when_possible(shared_ptr<entry_writer>
// If this is already too big now, we should throw early. It's also a correctness issue, since
// if we are too big at this moment we'll never reach allocate() to actually throw at that
// point.
auto ep = sanity_check_size(size);
if (ep) {
return make_exception_future<>(std::move(ep));
}
sanity_check_size(size);
auto fut = get_units(_request_controller, size, timeout);
if (_request_controller.waiters()) {