Files
scylladb/mutation_writer/feed_writers.hh

82 lines
1.9 KiB
C++

/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#pragma once
#include <seastar/coroutine/as_future.hh>
#include "readers/queue.hh"
namespace mutation_writer {
class bucket_writer {
schema_ptr _schema;
queue_reader_handle _handle;
future<> _consume_fut;
private:
bucket_writer(schema_ptr schema, std::pair<mutation_reader, queue_reader_handle> queue_reader, mutation_reader_consumer& consumer);
public:
bucket_writer(schema_ptr schema, reader_permit permit, mutation_reader_consumer& consumer);
future<> consume(mutation_fragment_v2 mf);
void consume_end_of_stream();
void abort(std::exception_ptr ep) noexcept;
future<> close() noexcept;
};
template <typename Writer>
requires MutationFragmentConsumerV2<Writer, future<>>
future<> feed_writer(mutation_reader&& rd_ref, Writer wr) {
// Only move in reader if stack was successfully allocated, so caller can close reader otherwise.
auto rd = std::move(rd_ref);
std::exception_ptr ex;
try {
while (!rd.is_end_of_stream()) {
co_await rd.fill_buffer();
while (!rd.is_buffer_empty()) {
co_await rd.pop_mutation_fragment().consume(wr);
}
}
} catch (...) {
ex = std::current_exception();
}
co_await rd.close();
try {
if (ex) {
wr.abort(ex);
} else {
wr.consume_end_of_stream();
}
} catch (...) {
if (!ex) {
ex = std::current_exception();
}
}
auto f = co_await coroutine::as_future(wr.close());
if (f.failed()) [[unlikely]] {
// Need to consume the failed future exception even if not used
auto close_ex = f.get_exception();
if (!ex) {
ex = std::move(close_ex);
}
}
if (ex) [[unlikely]] {
std::rethrow_exception(ex);
}
}
}