partition_entry: Introduce apply_to_incomplete()

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>

[tgrabiec:
  - extracted from a larger commit
  - fix heap comparator in apply_incomplete_target to order versions properly
  - extracted partition_version detaching into
    partition_entry::with_detached_versions()
  - dropped unnecessary rows_iterator::_version field
  - dropped unnecessary allocation of rows_entry and key copies
    in rows_iterator
  - dropped row_pointer
  - replaced apply_reversibly() with weaker and faster apply()
  - added handling of dummy entries at any position
  - fixed exception safety issue in apply_to_incomplete() which may
    result in data loss. We cannot move data out of applied versions
    into a new synthetic row and then apply it, because if exception
    happens in the middle, the data which was moved from the source
    will be lost. To fix that, row_iterator::consume_row() is
    introduced which allows in-place consumption of data without
    construction of temporary deletable_row.
  ]
This commit is contained in:
Piotr Jastrzebski
2017-05-10 17:47:34 +02:00
committed by Tomasz Grabiec
parent b6ce963200
commit b680de930c
2 changed files with 262 additions and 0 deletions

View File

@@ -317,6 +317,190 @@ void partition_entry::apply(const schema& s, partition_entry&& pe, const schema&
}
}
// Iterates over all rows in mutation represented by partition_entry.
// It abstracts away the fact that rows may be spread across multiple versions.
class partition_entry::rows_iterator final {
struct version {
mutation_partition::rows_type::iterator current_row;
mutation_partition::rows_type* rows;
bool can_move;
struct compare {
const rows_entry::tri_compare& _cmp;
public:
explicit compare(const rows_entry::tri_compare& cmp) : _cmp(cmp) { }
bool operator()(const version& a, const version& b) const {
return _cmp(*a.current_row, *b.current_row) > 0;
}
};
};
const schema& _schema;
rows_entry::tri_compare _rows_cmp;
rows_entry::compare _rows_less_cmp;
version::compare _version_cmp;
std::vector<version> _heap;
std::vector<version> _current_row;
public:
rows_iterator(partition_version* version, const schema& schema)
: _schema(schema)
, _rows_cmp(schema)
, _rows_less_cmp(schema)
, _version_cmp(_rows_cmp)
{
bool can_move = true;
while (version) {
can_move &= !version->is_referenced();
auto& rows = version->partition().clustered_rows();
if (!rows.empty()) {
_heap.push_back({rows.begin(), &rows, can_move});
}
version = version->next();
}
boost::range::make_heap(_heap, _version_cmp);
move_to_next_row();
}
bool done() const {
return _current_row.empty();
}
// Return clustering key of the current row in source.
// Valid only when !is_dummy().
const clustering_key& key() const {
return _current_row[0].current_row->key();
}
bool is_dummy() const {
return bool(_current_row[0].current_row->dummy());
}
template<typename RowConsumer>
void consume_row(RowConsumer&& consumer) {
assert(!_current_row.empty());
// versions in _current_row are not ordered but it is not a problem
// due to the fact that all rows are continuous.
for (version& v : _current_row) {
if (!v.can_move) {
consumer(deletable_row(v.current_row->row()));
} else {
consumer(std::move(v.current_row->row()));
}
}
}
void remove_current_row_when_possible() {
assert(!_current_row.empty());
auto deleter = current_deleter<rows_entry>();
for (version& v : _current_row) {
if (v.can_move) {
v.rows->erase_and_dispose(v.current_row, deleter);
}
}
}
void move_to_next_row() {
_current_row.clear();
while (!_heap.empty() &&
(_current_row.empty() || _rows_cmp(*_current_row[0].current_row, *_heap[0].current_row) == 0)) {
boost::range::pop_heap(_heap, _version_cmp);
auto& curr = _heap.back();
_current_row.push_back({curr.current_row, curr.rows, curr.can_move});
++curr.current_row;
if (curr.current_row == curr.rows->end()) {
_heap.pop_back();
} else {
boost::range::push_heap(_heap, _version_cmp);
}
}
}
};
namespace {
// When applying partition_entry to an incomplete partition_entry this class is used to represent
// the target incomplete partition_entry. It encapsulates the logic needed for handling multiple versions.
class apply_incomplete_target final {
struct version {
mutation_partition::rows_type::iterator current_row;
mutation_partition::rows_type* rows;
size_t version_no;
struct compare {
const rows_entry::tri_compare& _cmp;
public:
explicit compare(const rows_entry::tri_compare& cmp) : _cmp(cmp) { }
bool operator()(const version& a, const version& b) const {
auto res = _cmp(*a.current_row, *b.current_row);
return res > 0 || (res == 0 && a.version_no > b.version_no);
}
};
};
const schema& _schema;
partition_entry& _pe;
rows_entry::tri_compare _rows_cmp;
rows_entry::compare _rows_less_cmp;
version::compare _version_cmp;
std::vector<version> _heap;
mutation_partition::rows_type::iterator _next_in_latest_version;
public:
apply_incomplete_target(partition_entry& pe, const schema& schema)
: _schema(schema)
, _pe(pe)
, _rows_cmp(schema)
, _rows_less_cmp(schema)
, _version_cmp(_rows_cmp)
{
size_t version_no = 0;
_next_in_latest_version = pe.version()->partition().clustered_rows().begin();
for (auto&& v : pe.version()->elements_from_this()) {
if (!v.partition().clustered_rows().empty()) {
_heap.push_back({v.partition().clustered_rows().begin(), &v.partition().clustered_rows(), version_no});
}
++version_no;
}
boost::range::make_heap(_heap, _version_cmp);
}
// Applies the row from source.
// Must be called for rows with monotonic keys.
// Weak exception guarantees. The target and source partitions are left
// in a state such that the two still commute to the same value on retry.
void apply(partition_entry::rows_iterator& src) {
auto&& key = src.key();
while (!_heap.empty() && _rows_less_cmp(*_heap[0].current_row, key)) {
boost::range::pop_heap(_heap, _version_cmp);
auto& curr = _heap.back();
curr.current_row = curr.rows->lower_bound(key, _rows_less_cmp);
if (curr.version_no == 0) {
_next_in_latest_version = curr.current_row;
}
if (curr.current_row == curr.rows->end()) {
_heap.pop_back();
} else {
boost::range::push_heap(_heap, _version_cmp);
}
}
if (!_heap.empty()) {
rows_entry& next_row = *_heap[0].current_row;
if (_rows_cmp(key, next_row) == 0) {
if (next_row.dummy()) {
return;
}
} else if (!next_row.continuous()) {
return;
}
}
mutation_partition::rows_type& rows = _pe.version()->partition().clustered_rows();
if (_next_in_latest_version != rows.end() && _rows_cmp(key, *_next_in_latest_version) == 0) {
src.consume_row([&] (deletable_row&& row) {
_next_in_latest_version->row().apply(_schema, std::move(row));
});
} else {
auto e = current_allocator().construct<rows_entry>(key);
e->set_continuous(_heap.empty() ? is_continuous::yes : _heap[0].current_row->continuous());
rows.insert_before(_next_in_latest_version, *e);
src.consume_row([&] (deletable_row&& row) {
e->row().apply(_schema, std::move(row));
});
}
}
};
} // namespace
template<typename Func>
void partition_entry::with_detached_versions(Func&& func) {
@@ -342,6 +526,58 @@ void partition_entry::with_detached_versions(Func&& func) {
func(current);
}
void partition_entry::apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema)
{
if (s.version() != pe_schema.version()) {
partition_entry entry(pe.squashed(pe_schema.shared_from_this(), s.shared_from_this()));
entry.with_detached_versions([&] (partition_version* v) {
apply_to_incomplete(s, v);
});
} else {
pe.with_detached_versions([&](partition_version* v) {
apply_to_incomplete(s, v);
});
}
}
void partition_entry::apply_to_incomplete(const schema& s, partition_version* version) {
partition_version& dst = open_version(s);
bool can_move = true;
auto current = version;
bool static_row_continuous = dst.partition().static_row_continuous();
while (current) {
can_move &= !current->is_referenced();
dst.partition().apply(current->partition().partition_tombstone());
if (static_row_continuous) {
row& static_row = dst.partition().static_row();
if (can_move) {
static_row.apply(s, column_kind::static_column, std::move(current->partition().static_row()));
} else {
static_row.apply(s, column_kind::static_column, current->partition().static_row());
}
}
range_tombstone_list& tombstones = dst.partition().row_tombstones();
if (can_move) {
tombstones.apply_reversibly(s, current->partition().row_tombstones()).cancel();
} else {
tombstones.apply(s, current->partition().row_tombstones());
}
current = current->next();
}
partition_entry::rows_iterator source(version, s);
apply_incomplete_target target(*this, s);
while (!source.done()) {
if (!source.is_dummy()) {
target.apply(source);
}
source.remove_current_row_when_possible();
source.move_to_next_row();
}
}
mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
{
mutation_partition mp(to);

View File

@@ -225,6 +225,11 @@ public:
}
unsigned version_count();
bool at_latest_version() const {
return _entry != nullptr;
}
tombstone partition_tombstone() const;
row static_row() const;
mutation_partition squashed() const;
@@ -253,7 +258,9 @@ private:
void set_version(partition_version*);
void apply(const schema& s, partition_version* pv, const schema& pv_schema);
void apply_to_incomplete(const schema& s, partition_version* other);
public:
class rows_iterator;
partition_entry() = default;
explicit partition_entry(mutation_partition mp);
~partition_entry();
@@ -294,6 +301,25 @@ public:
// Assumes this instance and mpv are fully continuous.
void apply(const schema& s, mutation_partition_view mpv, const schema& mp_schema);
// Adds mutation_partition represented by "other" to the one represented
// by this entry.
//
// The argument must be fully-continuous.
//
// The rules of addition differ from that used by regular
// mutation_partition addition with regards to continuity. The continuity
// of the result is the same as in this instance. Information from "other"
// which is incomplete in this instance is dropped. In other words, this
// performs set intersection on continuity information, drops information
// which falls outside of the continuity range, and applies regular merging
// rules for the rest.
//
// Weak exception guarantees.
// If an exception is thrown this and pe will be left in some valid states
// such that if the operation is retried (possibly many times) and eventually
// succeeds the result will be as if the first attempt didn't fail.
void apply_to_incomplete(const schema& s, partition_entry&& pe, const schema& pe_schema);
// Weak exception guarantees.
// If an exception is thrown this and pe will be left in some valid states
// such that if the operation is retried (possibly many times) and eventually