From b680de930c283efda06e7591cefbc23dbeac1fc1 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Wed, 10 May 2017 17:47:34 +0200 Subject: [PATCH] partition_entry: Introduce apply_to_incomplete() Signed-off-by: Piotr Jastrzebski [tgrabiec: - extracted from a larger commit - fix heap comparator in apply_incomplete_target to order versions properly - extracted partition_version detaching into partition_entry::with_detached_versions() - dropped unnecessary rows_iterator::_version field - dropped unnecessary allocation of rows_entry and key copies in rows_iterator - dropped row_pointer - replaced apply_reversibly() with weaker and faster apply() - added handling of dummy entries at any position - fixed exception safety issue in apply_to_incomplete() which may result in data loss. We cannot move data out of applied versions into a new synthetic row and then apply it, because if exception happens in the middle, the data which was moved from the source will be lost. To fix that, row_iterator::consume_row() is introduced which allows in-place consumption of data without construction of temporary deletable_row. ] --- partition_version.cc | 236 +++++++++++++++++++++++++++++++++++++++++++ partition_version.hh | 26 +++++ 2 files changed, 262 insertions(+) diff --git a/partition_version.cc b/partition_version.cc index 86bdc9e3d1..b41c5d686c 100644 --- a/partition_version.cc +++ b/partition_version.cc @@ -317,6 +317,190 @@ void partition_entry::apply(const schema& s, partition_entry&& pe, const schema& } } +// Iterates over all rows in mutation represented by partition_entry. +// It abstracts away the fact that rows may be spread across multiple versions. +class partition_entry::rows_iterator final { + struct version { + mutation_partition::rows_type::iterator current_row; + mutation_partition::rows_type* rows; + bool can_move; + struct compare { + const rows_entry::tri_compare& _cmp; + public: + explicit compare(const rows_entry::tri_compare& cmp) : _cmp(cmp) { } + bool operator()(const version& a, const version& b) const { + return _cmp(*a.current_row, *b.current_row) > 0; + } + }; + }; + const schema& _schema; + rows_entry::tri_compare _rows_cmp; + rows_entry::compare _rows_less_cmp; + version::compare _version_cmp; + std::vector _heap; + std::vector _current_row; +public: + rows_iterator(partition_version* version, const schema& schema) + : _schema(schema) + , _rows_cmp(schema) + , _rows_less_cmp(schema) + , _version_cmp(_rows_cmp) + { + bool can_move = true; + while (version) { + can_move &= !version->is_referenced(); + auto& rows = version->partition().clustered_rows(); + if (!rows.empty()) { + _heap.push_back({rows.begin(), &rows, can_move}); + } + version = version->next(); + } + boost::range::make_heap(_heap, _version_cmp); + move_to_next_row(); + } + bool done() const { + return _current_row.empty(); + } + // Return clustering key of the current row in source. + // Valid only when !is_dummy(). + const clustering_key& key() const { + return _current_row[0].current_row->key(); + } + bool is_dummy() const { + return bool(_current_row[0].current_row->dummy()); + } + template + void consume_row(RowConsumer&& consumer) { + assert(!_current_row.empty()); + // versions in _current_row are not ordered but it is not a problem + // due to the fact that all rows are continuous. + for (version& v : _current_row) { + if (!v.can_move) { + consumer(deletable_row(v.current_row->row())); + } else { + consumer(std::move(v.current_row->row())); + } + } + } + void remove_current_row_when_possible() { + assert(!_current_row.empty()); + auto deleter = current_deleter(); + for (version& v : _current_row) { + if (v.can_move) { + v.rows->erase_and_dispose(v.current_row, deleter); + } + } + } + void move_to_next_row() { + _current_row.clear(); + while (!_heap.empty() && + (_current_row.empty() || _rows_cmp(*_current_row[0].current_row, *_heap[0].current_row) == 0)) { + boost::range::pop_heap(_heap, _version_cmp); + auto& curr = _heap.back(); + _current_row.push_back({curr.current_row, curr.rows, curr.can_move}); + ++curr.current_row; + if (curr.current_row == curr.rows->end()) { + _heap.pop_back(); + } else { + boost::range::push_heap(_heap, _version_cmp); + } + } + } +}; + +namespace { + +// When applying partition_entry to an incomplete partition_entry this class is used to represent +// the target incomplete partition_entry. It encapsulates the logic needed for handling multiple versions. +class apply_incomplete_target final { + struct version { + mutation_partition::rows_type::iterator current_row; + mutation_partition::rows_type* rows; + size_t version_no; + + struct compare { + const rows_entry::tri_compare& _cmp; + public: + explicit compare(const rows_entry::tri_compare& cmp) : _cmp(cmp) { } + bool operator()(const version& a, const version& b) const { + auto res = _cmp(*a.current_row, *b.current_row); + return res > 0 || (res == 0 && a.version_no > b.version_no); + } + }; + }; + const schema& _schema; + partition_entry& _pe; + rows_entry::tri_compare _rows_cmp; + rows_entry::compare _rows_less_cmp; + version::compare _version_cmp; + std::vector _heap; + mutation_partition::rows_type::iterator _next_in_latest_version; +public: + apply_incomplete_target(partition_entry& pe, const schema& schema) + : _schema(schema) + , _pe(pe) + , _rows_cmp(schema) + , _rows_less_cmp(schema) + , _version_cmp(_rows_cmp) + { + size_t version_no = 0; + _next_in_latest_version = pe.version()->partition().clustered_rows().begin(); + for (auto&& v : pe.version()->elements_from_this()) { + if (!v.partition().clustered_rows().empty()) { + _heap.push_back({v.partition().clustered_rows().begin(), &v.partition().clustered_rows(), version_no}); + } + ++version_no; + } + boost::range::make_heap(_heap, _version_cmp); + } + // Applies the row from source. + // Must be called for rows with monotonic keys. + // Weak exception guarantees. The target and source partitions are left + // in a state such that the two still commute to the same value on retry. + void apply(partition_entry::rows_iterator& src) { + auto&& key = src.key(); + while (!_heap.empty() && _rows_less_cmp(*_heap[0].current_row, key)) { + boost::range::pop_heap(_heap, _version_cmp); + auto& curr = _heap.back(); + curr.current_row = curr.rows->lower_bound(key, _rows_less_cmp); + if (curr.version_no == 0) { + _next_in_latest_version = curr.current_row; + } + if (curr.current_row == curr.rows->end()) { + _heap.pop_back(); + } else { + boost::range::push_heap(_heap, _version_cmp); + } + } + + if (!_heap.empty()) { + rows_entry& next_row = *_heap[0].current_row; + if (_rows_cmp(key, next_row) == 0) { + if (next_row.dummy()) { + return; + } + } else if (!next_row.continuous()) { + return; + } + } + + mutation_partition::rows_type& rows = _pe.version()->partition().clustered_rows(); + if (_next_in_latest_version != rows.end() && _rows_cmp(key, *_next_in_latest_version) == 0) { + src.consume_row([&] (deletable_row&& row) { + _next_in_latest_version->row().apply(_schema, std::move(row)); + }); + } else { + auto e = current_allocator().construct(key); + e->set_continuous(_heap.empty() ? is_continuous::yes : _heap[0].current_row->continuous()); + rows.insert_before(_next_in_latest_version, *e); + src.consume_row([&] (deletable_row&& row) { + e->row().apply(_schema, std::move(row)); + }); + } + } +}; + +} // namespace template void partition_entry::with_detached_versions(Func&& func) { @@ -342,6 +526,58 @@ void partition_entry::with_detached_versions(Func&& func) { func(current); } +void partition_entry::apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema) +{ + if (s.version() != pe_schema.version()) { + partition_entry entry(pe.squashed(pe_schema.shared_from_this(), s.shared_from_this())); + entry.with_detached_versions([&] (partition_version* v) { + apply_to_incomplete(s, v); + }); + } else { + pe.with_detached_versions([&](partition_version* v) { + apply_to_incomplete(s, v); + }); + } +} + +void partition_entry::apply_to_incomplete(const schema& s, partition_version* version) { + partition_version& dst = open_version(s); + + bool can_move = true; + auto current = version; + bool static_row_continuous = dst.partition().static_row_continuous(); + while (current) { + can_move &= !current->is_referenced(); + dst.partition().apply(current->partition().partition_tombstone()); + if (static_row_continuous) { + row& static_row = dst.partition().static_row(); + if (can_move) { + static_row.apply(s, column_kind::static_column, std::move(current->partition().static_row())); + } else { + static_row.apply(s, column_kind::static_column, current->partition().static_row()); + } + } + range_tombstone_list& tombstones = dst.partition().row_tombstones(); + if (can_move) { + tombstones.apply_reversibly(s, current->partition().row_tombstones()).cancel(); + } else { + tombstones.apply(s, current->partition().row_tombstones()); + } + current = current->next(); + } + + partition_entry::rows_iterator source(version, s); + apply_incomplete_target target(*this, s); + + while (!source.done()) { + if (!source.is_dummy()) { + target.apply(source); + } + source.remove_current_row_when_possible(); + source.move_to_next_row(); + } +} + mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to) { mutation_partition mp(to); diff --git a/partition_version.hh b/partition_version.hh index 29407d4a78..b5e915e9f9 100644 --- a/partition_version.hh +++ b/partition_version.hh @@ -225,6 +225,11 @@ public: } unsigned version_count(); + + bool at_latest_version() const { + return _entry != nullptr; + } + tombstone partition_tombstone() const; row static_row() const; mutation_partition squashed() const; @@ -253,7 +258,9 @@ private: void set_version(partition_version*); void apply(const schema& s, partition_version* pv, const schema& pv_schema); + void apply_to_incomplete(const schema& s, partition_version* other); public: + class rows_iterator; partition_entry() = default; explicit partition_entry(mutation_partition mp); ~partition_entry(); @@ -294,6 +301,25 @@ public: // Assumes this instance and mpv are fully continuous. void apply(const schema& s, mutation_partition_view mpv, const schema& mp_schema); + // Adds mutation_partition represented by "other" to the one represented + // by this entry. + // + // The argument must be fully-continuous. + // + // The rules of addition differ from that used by regular + // mutation_partition addition with regards to continuity. The continuity + // of the result is the same as in this instance. Information from "other" + // which is incomplete in this instance is dropped. In other words, this + // performs set intersection on continuity information, drops information + // which falls outside of the continuity range, and applies regular merging + // rules for the rest. + // + // Weak exception guarantees. + // If an exception is thrown this and pe will be left in some valid states + // such that if the operation is retried (possibly many times) and eventually + // succeeds the result will be as if the first attempt didn't fail. + void apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema); + // Weak exception guarantees. // If an exception is thrown this and pe will be left in some valid states // such that if the operation is retried (possibly many times) and eventually