mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
streamed_mutation: add mutation_merger
Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -23,6 +23,7 @@
|
||||
|
||||
#include "mutation.hh"
|
||||
#include "streamed_mutation.hh"
|
||||
#include "utils/move.hh"
|
||||
|
||||
mutation_fragment::mutation_fragment(static_row&& r)
|
||||
: _kind(kind::static_row), _data(std::make_unique<data>())
|
||||
@@ -214,3 +215,165 @@ streamed_mutation streamed_mutation_from_mutation(mutation m)
|
||||
|
||||
return make_streamed_mutation<reader>(std::move(m));
|
||||
}
|
||||
|
||||
class mutation_merger final : public streamed_mutation::impl {
|
||||
std::vector<streamed_mutation> _original_readers;
|
||||
struct streamed_reader {
|
||||
tombstone current_tombstone;
|
||||
streamed_mutation* reader;
|
||||
};
|
||||
std::vector<streamed_reader> _next_readers;
|
||||
// FIXME: do not store all in-flight clustering rows in memory
|
||||
struct row_and_reader {
|
||||
mutation_fragment row;
|
||||
streamed_reader reader;
|
||||
};
|
||||
std::vector<row_and_reader> _readers;
|
||||
tombstone _current_tombstone;
|
||||
private:
|
||||
static void update_current_tombstone(streamed_reader& sr, mutation_fragment& mf) {
|
||||
if (mf.is_range_tombstone_begin()) {
|
||||
assert(!sr.current_tombstone);
|
||||
sr.current_tombstone = mf.as_range_tombstone_begin().tomb();
|
||||
} else if (mf.is_range_tombstone_end()) {
|
||||
assert(sr.current_tombstone);
|
||||
sr.current_tombstone = { };
|
||||
}
|
||||
}
|
||||
|
||||
void read_next() {
|
||||
if (_readers.empty()) {
|
||||
_end_of_stream = true;
|
||||
return;
|
||||
}
|
||||
|
||||
position_in_partition::less_compare cmp(*_schema);
|
||||
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); };
|
||||
|
||||
boost::range::pop_heap(_readers, heap_compare);
|
||||
auto result = std::move(_readers.back().row);
|
||||
update_current_tombstone(_readers.back().reader, result);
|
||||
_next_readers.emplace_back(std::move(_readers.back().reader));
|
||||
_readers.pop_back();
|
||||
|
||||
while (!_readers.empty()) {
|
||||
if (cmp(result, _readers.front().row)) {
|
||||
break;
|
||||
}
|
||||
boost::range::pop_heap(_readers, heap_compare);
|
||||
update_current_tombstone(_readers.back().reader, _readers.back().row);
|
||||
result.apply(*_schema, std::move(_readers.back().row));
|
||||
_next_readers.emplace_back(std::move(_readers.back().reader));
|
||||
_readers.pop_back();
|
||||
}
|
||||
|
||||
bool can_emit_result = true;
|
||||
if (result.is_range_tombstone_begin()) {
|
||||
auto new_t = result.as_range_tombstone_begin().tomb();
|
||||
can_emit_result = _current_tombstone < new_t;
|
||||
if (can_emit_result) {
|
||||
if (_current_tombstone) {
|
||||
auto& rtb = result.as_range_tombstone_begin();
|
||||
auto rte = range_tombstone_end(rtb.key(), invert_kind(rtb.kind()));
|
||||
push_mutation_fragment(std::move(rte));
|
||||
}
|
||||
push_mutation_fragment(std::move(result));
|
||||
_current_tombstone = new_t;
|
||||
}
|
||||
} else if (result.is_range_tombstone_end()) {
|
||||
tombstone new_t;
|
||||
for (auto& r_a_r : _readers) {
|
||||
new_t = std::max(new_t, r_a_r.reader.current_tombstone);
|
||||
}
|
||||
for (auto& r : _next_readers) {
|
||||
new_t = std::max(new_t, r.current_tombstone);
|
||||
}
|
||||
can_emit_result = new_t != _current_tombstone;
|
||||
if (can_emit_result) {
|
||||
if (new_t) {
|
||||
auto& rte = result.as_range_tombstone_end();
|
||||
auto rtb = range_tombstone_begin(rte.key(), invert_kind(rte.kind()), new_t);
|
||||
push_mutation_fragment(std::move(result));
|
||||
push_mutation_fragment(std::move(rtb));
|
||||
} else {
|
||||
push_mutation_fragment(std::move(result));
|
||||
}
|
||||
_current_tombstone = new_t;
|
||||
}
|
||||
} else {
|
||||
push_mutation_fragment(std::move(result));
|
||||
}
|
||||
}
|
||||
|
||||
void do_fill_buffer() {
|
||||
position_in_partition::less_compare cmp(*_schema);
|
||||
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); };
|
||||
|
||||
for (auto& rd : _next_readers) {
|
||||
if (rd.reader->is_buffer_empty()) {
|
||||
assert(rd.reader->is_end_of_stream());
|
||||
continue;
|
||||
}
|
||||
_readers.emplace_back(row_and_reader { rd.reader->pop_mutation_fragment(), std::move(rd) });
|
||||
boost::range::push_heap(_readers, heap_compare);
|
||||
}
|
||||
_next_readers.clear();
|
||||
|
||||
read_next();
|
||||
}
|
||||
void prefill_buffer() {
|
||||
while (!is_end_of_stream() && !is_buffer_full()) {
|
||||
for (auto& rd : _next_readers) {
|
||||
if (rd.reader->is_buffer_empty() && !rd.reader->is_end_of_stream()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
do_fill_buffer();
|
||||
}
|
||||
}
|
||||
|
||||
static tombstone merge_partition_tombstones(const std::vector<streamed_mutation>& readers) {
|
||||
tombstone t;
|
||||
for (auto& r : readers) {
|
||||
t.apply(r.partition_tombstone());
|
||||
}
|
||||
return t;
|
||||
}
|
||||
protected:
|
||||
virtual future<> fill_buffer() override {
|
||||
if (_next_readers.empty()) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
while (!is_end_of_stream() && !is_buffer_full()) {
|
||||
std::vector<future<>> more_data;
|
||||
for (auto& rd : _next_readers) {
|
||||
if (rd.reader->is_buffer_empty() && !rd.reader->is_end_of_stream()) {
|
||||
more_data.emplace_back(rd.reader->fill_buffer());
|
||||
}
|
||||
}
|
||||
if (!more_data.empty()) {
|
||||
return parallel_for_each(std::move(more_data), [] (auto& f) { return std::move(f); }).then([this] { return fill_buffer(); });
|
||||
}
|
||||
do_fill_buffer();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
public:
|
||||
mutation_merger(schema_ptr s, dht::decorated_key dk, std::vector<streamed_mutation> readers)
|
||||
: streamed_mutation::impl(std::move(s), std::move(dk), merge_partition_tombstones(readers))
|
||||
, _original_readers(std::move(readers))
|
||||
{
|
||||
_next_readers.reserve(_original_readers.size());
|
||||
_readers.reserve(_original_readers.size());
|
||||
for (auto& rd : _original_readers) {
|
||||
_next_readers.emplace_back(streamed_reader { { }, &rd });
|
||||
}
|
||||
prefill_buffer();
|
||||
}
|
||||
};
|
||||
|
||||
streamed_mutation merge_mutations(std::vector<streamed_mutation> ms)
|
||||
{
|
||||
assert(!ms.empty());
|
||||
return make_streamed_mutation<mutation_merger>(ms.back().schema(), ms.back().decorated_key(), std::move(ms));
|
||||
}
|
||||
|
||||
@@ -535,4 +535,7 @@ auto consume(streamed_mutation& m, Consumer consumer) {
|
||||
|
||||
class mutation;
|
||||
|
||||
streamed_mutation streamed_mutation_from_mutation(mutation);
|
||||
streamed_mutation streamed_mutation_from_mutation(mutation);
|
||||
|
||||
//Requires all streamed_mutations to have the same schema.
|
||||
streamed_mutation merge_mutations(std::vector<streamed_mutation>);
|
||||
|
||||
Reference in New Issue
Block a user