Merge "Reduce the size of mutation_partition" from Piotr

"Reduce the size of mutation_partition by implementing intrusive set using
bi::rbtree_algorithms directly and using tree nodes optimized for size.

This will reduce the size of mutation_partition by:
24 bytes + <number of cql rows> * 8 bytes

This should have a positive impact on performance because mutation_partitions
are stored both in memtable and cache.

Fixes #742."

* 'haaawk/742' of github.com:cloudius-systems/seastar-dev:
  intrusive_set: rename size() to calculate_size()
  Make intrusive_set_external_comparator::_value_traits static
  Implement intrusive set using rbtree_algorithms
  mutation_partition: make apply_reversibly_intrusive_set nongeneric
  mutation_partition: take schema in find_row and clustered_row
  mutation_partition: Extract intrusive set logic to a class.
  mutation_partition: Replace value_comp with key_comp calls
This commit is contained in:
Avi Kivity
2017-01-05 17:34:10 +02:00
16 changed files with 345 additions and 111 deletions

View File

@@ -81,7 +81,7 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering
// case empty prefix can only refer to the static row.
bool is_static_prefix = s->has_static_columns() && !prefix;
if (type == statement_type::INSERT && !is_static_prefix && s->is_cql3_table()) {
auto& row = m.partition().clustered_row(clustering_key::from_clustering_prefix(*s, prefix));
auto& row = m.partition().clustered_row(*s, clustering_key::from_clustering_prefix(*s, prefix));
row.apply(row_marker(params.timestamp(), params.ttl(), params.expiry()));
}
}

View File

@@ -556,11 +556,11 @@ column_family::find_partition_slow(schema_ptr s, const partition_key& key) const
future<column_family::const_row_ptr>
column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, clustering_key clustering_key) const {
return find_partition(std::move(s), partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) {
return find_partition(s, partition_key).then([clustering_key = std::move(clustering_key), s] (const_mutation_partition_ptr p) {
if (!p) {
return make_ready_future<const_row_ptr>();
}
auto r = p->find_row(clustering_key);
auto r = p->find_row(*s, clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return make_ready_future<const_row_ptr>(std::make_unique<row>(*r));

View File

@@ -0,0 +1,224 @@
/*
* Copyright (C) 2016 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* (C) Copyright Ion Gaztanaga 2013-2014
* Distributed under the Boost Software License, Version 1.0.
* (See accompanying file LICENSE_1_0.txt or copy at
* http://www.boost.org/LICENSE_1_0.txt)
*/
#pragma once
#include <boost/intrusive/set.hpp>
namespace bi = boost::intrusive;
typedef bi::rbtree_algorithms<bi::rbtree_node_traits<void*, true>> algo;
class intrusive_set_external_comparator_member_hook : public bi::set_member_hook<bi::optimize_size<true>> {
public:
intrusive_set_external_comparator_member_hook() = default;
intrusive_set_external_comparator_member_hook(intrusive_set_external_comparator_member_hook&& o) noexcept {
algo::replace_node(o.this_ptr(), this_ptr());
algo::init(o.this_ptr());
}
};
template<typename Elem,
intrusive_set_external_comparator_member_hook Elem::* PtrToMember>
class intrusive_set_external_comparator final {
typedef boost::intrusive::mhtraits<Elem, intrusive_set_external_comparator_member_hook, PtrToMember> value_traits;
typedef typename value_traits::node_traits node_traits;
typedef typename node_traits::node_ptr node_ptr;
public:
typedef Elem value_type;
typedef typename bi::tree_iterator<value_traits, false> iterator;
typedef typename bi::tree_iterator<value_traits, true> const_iterator;
typedef typename bi::reverse_iterator<iterator> reverse_iterator;
typedef typename bi::reverse_iterator<const_iterator> const_reverse_iterator;
private:
intrusive_set_external_comparator_member_hook _header;
static const value_traits _value_traits;
struct key_of_value {
typedef Elem type;
Elem& operator()(Elem& t) { return t; }
};
template <typename Comparator>
struct key_node_comparator {
Comparator _cmp;
const value_traits& _value_traits;
key_node_comparator(Comparator cmp, const value_traits& value_traits) : _cmp(cmp), _value_traits(value_traits) { }
bool operator()(const node_ptr& a, const node_ptr& b) {
return _cmp(*_value_traits.to_value_ptr(a), *_value_traits.to_value_ptr(b));
}
template <typename T1>
bool operator()(const node_ptr& a, const T1& b) {
return _cmp(*_value_traits.to_value_ptr(a), b);
}
template <typename T1>
bool operator()(const T1& a, const node_ptr& b) {
return _cmp(a, *_value_traits.to_value_ptr(b));
}
template <typename T1, typename T2>
bool operator()(const T1& a, const T2& b) {
return _cmp(a, b);
}
};
typedef typename bi::value_traits_pointers<value_traits>::const_value_traits_ptr const_value_traits_ptr;
const_value_traits_ptr priv_value_traits_ptr() const {
return bi::pointer_traits<const_value_traits_ptr>::pointer_to(_value_traits);
}
template <typename Comparator>
key_node_comparator<Comparator> key_node_comp(Comparator comp) const {
return key_node_comparator<Comparator>(comp, _value_traits);
}
iterator insert_unique_commit(Elem& value, const algo::insert_commit_data &commit_data) {
node_ptr to_insert(_value_traits.to_node_ptr(value));
algo::insert_unique_commit(_header.this_ptr(), to_insert, commit_data);
return iterator(to_insert, priv_value_traits_ptr());
}
public:
intrusive_set_external_comparator() { algo::init_header(_header.this_ptr()); }
intrusive_set_external_comparator(intrusive_set_external_comparator&& o) {
algo::swap_tree(_header.this_ptr(), node_ptr(o._header.this_ptr()));
}
iterator begin() { return iterator(algo::begin_node(_header.this_ptr()), priv_value_traits_ptr()); }
const_iterator begin() const { return const_iterator(algo::begin_node(_header.this_ptr()), priv_value_traits_ptr()); }
iterator end() { return iterator(algo::end_node(_header.this_ptr()), priv_value_traits_ptr()); }
const_iterator end() const { return const_iterator(algo::end_node(_header.this_ptr()), priv_value_traits_ptr()); }
reverse_iterator rbegin() { return reverse_iterator(end()); }
const_reverse_iterator rbegin() const { return const_reverse_iterator(end()); }
reverse_iterator rend() { return reverse_iterator(begin()); }
const_reverse_iterator rend() const { return const_reverse_iterator(begin()); }
template<class Disposer>
void clear_and_dispose(Disposer disposer) {
algo::clear_and_dispose(_header.this_ptr(),
[&disposer] (const node_ptr& p) {
disposer(_value_traits.to_value_ptr(p));
});
algo::init_header(_header.this_ptr());
}
bool empty() const { return algo::unique(_header.this_ptr()); }
// WARNING: this method has O(N) time complexity, use with care
auto calculate_size() const { return algo::size(_header.this_ptr()); }
iterator erase(const_iterator i) {
const_iterator ret(i);
++ret;
node_ptr to_erase(i.pointed_node());
algo::erase(_header.this_ptr(), to_erase);
algo::init(to_erase);
return ret.unconst();
}
iterator erase(const_iterator b, const_iterator e) {
while (b != e) {
erase(b++);
}
return b.unconst();
}
template<class Disposer>
iterator erase_and_dispose(const_iterator i, Disposer disposer) {
node_ptr to_erase(i.pointed_node());
iterator ret(erase(i));
disposer(_value_traits.to_value_ptr(to_erase));
return ret;
}
template<class Disposer>
iterator erase_and_dispose(const_iterator b, const_iterator e, Disposer disposer) {
while (b != e) {
erase_and_dispose(b++, disposer);
}
return b.unconst();
}
template <class Cloner, class Disposer>
void clone_from(const intrusive_set_external_comparator &src, Cloner cloner, Disposer disposer) {
clear_and_dispose(disposer);
if (!src.empty()) {
auto rollback = defer([this, &disposer] { this->clear_and_dispose(disposer); });
algo::clone(src._header.this_ptr(),
_header.this_ptr(),
[&cloner] (const node_ptr& p) {
return _value_traits.to_node_ptr(*cloner(*_value_traits.to_value_ptr(p)));
},
[&disposer] (const node_ptr& p) {
disposer(_value_traits.to_value_ptr(p));
});
rollback.cancel();
}
}
Elem* unlink_leftmost_without_rebalance() {
node_ptr to_be_disposed(algo::unlink_leftmost_without_rebalance(_header.this_ptr()));
if(!to_be_disposed)
return 0;
algo::init(to_be_disposed);
return _value_traits.to_value_ptr(to_be_disposed);
}
iterator insert_before(const_iterator pos, Elem& value) {
node_ptr to_insert(_value_traits.to_node_ptr(value));
return iterator(algo::insert_before(_header.this_ptr(), pos.pointed_node(), to_insert), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
iterator upper_bound(const KeyType& key, KeyTypeKeyCompare comp) {
return iterator(algo::upper_bound(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
const_iterator upper_bound(const KeyType& key, KeyTypeKeyCompare comp) const {
return const_iterator(algo::upper_bound(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
iterator lower_bound(const KeyType &key, KeyTypeKeyCompare comp) {
return iterator(algo::lower_bound(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
const_iterator lower_bound(const KeyType &key, KeyTypeKeyCompare comp) const {
return const_iterator(algo::lower_bound(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
iterator find(const KeyType &key, KeyTypeKeyCompare comp) {
return iterator(algo::find(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class KeyType, class KeyTypeKeyCompare>
const_iterator find(const KeyType &key, KeyTypeKeyCompare comp) const {
return const_iterator(algo::find(_header.this_ptr(), key, key_node_comp(comp)), priv_value_traits_ptr());
}
template<class ElemCompare>
iterator insert(const_iterator hint, Elem& value, ElemCompare cmp) {
algo::insert_commit_data commit_data;
std::pair<node_ptr, bool> ret =
algo::insert_unique_check(_header.this_ptr(),
hint.pointed_node(),
key_of_value()(value),
key_node_comp(cmp),
commit_data);
return ret.second ? insert_unique_commit(value, commit_data)
: iterator(ret.first, priv_value_traits_ptr());
}
};
template<typename Elem,
intrusive_set_external_comparator_member_hook Elem::* PtrToMember>
const typename intrusive_set_external_comparator<Elem, PtrToMember>::value_traits intrusive_set_external_comparator<Elem, PtrToMember>::_value_traits;

View File

@@ -0,0 +1,23 @@
Boost Software License - Version 1.0 - August 17th, 2003
Permission is hereby granted, free of charge, to any person or organization
obtaining a copy of the software and accompanying documentation covered by
this license (the "Software") to use, reproduce, display, distribute,
execute, and transmit the Software, and to prepare derivative works of the
Software, and to permit third-parties to whom the Software is furnished to
do so, all subject to the following:
The copyright notices in the Software and this entire statement, including
the above license grant, this restriction and the following disclaimer,
must be included in all copies of the Software, in whole or in part, and
all derivative works of the Software, unless such copies or derivative
works are solely in the form of machine-executable object code generated by
a source language processor.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@@ -62,7 +62,7 @@ void mutation::set_static_cell(const bytes& name, const data_value& value, api::
}
void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection&& value) {
auto& row = partition().clustered_row(clustering_key::from_clustering_prefix(*schema(), prefix)).cells();
auto& row = partition().clustered_row(*schema(), clustering_key::from_clustering_prefix(*schema(), prefix)).cells();
row.apply(def, std::move(value));
}
@@ -76,7 +76,7 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name,
}
void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection&& value) {
auto& row = partition().clustered_row(key).cells();
auto& row = partition().clustered_row(*schema(), key).cells();
row.apply(def, std::move(value));
}
@@ -108,7 +108,7 @@ mutation::get_cell(const clustering_key& rkey, const column_definition& def) con
}
return { *cell };
} else {
const row* r = partition().find_row(rkey);
const row* r = partition().find_row(*schema(), rkey);
if (!r) {
return {};
}
@@ -287,7 +287,7 @@ public:
if (!check_remaining_limit(cr)) {
return stop_iteration::yes;
}
auto& dr = _m.partition().clustered_row(std::move(cr.key()));
auto& dr = _m.partition().clustered_row(*_m.schema(), std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
dr.cells().apply(*_m.schema(), column_kind::regular_column, std::move(cr.cells()));

View File

@@ -32,6 +32,7 @@
#include "mutation_query.hh"
#include "service/priority_manager.hh"
#include "mutation_compactor.hh"
#include "intrusive_set_external_comparator.hh"
template<bool reversed>
struct reversal_traits;
@@ -136,7 +137,7 @@ struct reversal_traits<true> {
//
// apply_reversibly_intrusive_set() and revert_intrusive_set() implement ReversiblyMergeable
// for a boost::intrusive_set<> container of ReversiblyMergeable entries.
// for a rows_type container of ReversiblyMergeable entries.
//
// See reversibly_mergeable.hh
//
@@ -158,38 +159,34 @@ struct reversal_traits<true> {
//
// revert for apply_reversibly_intrusive_set()
template<typename Container, typename Revert = default_reverter<typename Container::value_type>>
void revert_intrusive_set_range(Container& dst, Container& src,
typename Container::iterator start,
typename Container::iterator end,
Revert&& revert = Revert()) noexcept
void revert_intrusive_set_range(const schema& s, mutation_partition::rows_type& dst, mutation_partition::rows_type& src,
mutation_partition::rows_type::iterator start,
mutation_partition::rows_type::iterator end) noexcept
{
using value_type = typename Container::value_type;
auto deleter = current_deleter<value_type>();
auto deleter = current_deleter<rows_entry>();
while (start != end) {
auto& e = *start;
// lower_bound() can allocate if linearization is required but it should have
// been already performed by the lower_bound() invocation in apply_reversibly_intrusive_set() and
// stored in the linearization context.
auto i = dst.find(e);
auto i = dst.find(e, rows_entry::compare(s));
assert(i != dst.end());
value_type& dst_e = *i;
rows_entry& dst_e = *i;
if (e.empty()) {
dst.erase(i);
start = src.erase_and_dispose(start, deleter);
start = src.insert_before(start, dst_e);
} else {
revert(dst_e, e);
dst_e.revert(s, e);
}
++start;
}
}
template<typename Container, typename Revert = default_reverter<typename Container::value_type>>
void revert_intrusive_set(Container& dst, Container& src, Revert&& revert = Revert()) noexcept {
revert_intrusive_set_range(dst, src, src.begin(), src.end(), std::forward<Revert>(revert));
void revert_intrusive_set(const schema& s, mutation_partition::rows_type& dst, mutation_partition::rows_type& src) noexcept {
revert_intrusive_set_range(s, dst, src, src.begin(), src.end());
}
// Applies src onto dst. See comment above revert_intrusive_set_range() for more details.
@@ -197,41 +194,38 @@ void revert_intrusive_set(Container& dst, Container& src, Revert&& revert = Reve
// Returns an object which upon going out of scope, unless cancel() is called on it,
// reverts the applicaiton by calling revert_intrusive_set(). The references to containers
// must be stable as long as the returned object is live.
template<typename Container,
typename Apply = default_reversible_applier<typename Container::value_type>,
typename Revert = default_reverter<typename Container::value_type>>
auto apply_reversibly_intrusive_set(Container& dst, Container& src, Apply&& apply = Apply(), Revert&& revert = Revert()) {
using value_type = typename Container::value_type;
auto apply_reversibly_intrusive_set(const schema& s, mutation_partition::rows_type& dst, mutation_partition::rows_type& src) {
auto src_i = src.begin();
try {
rows_entry::compare cmp(s);
while (src_i != src.end()) {
value_type& src_e = *src_i;
rows_entry& src_e = *src_i;
// neutral entries will be given special meaning for the purpose of revert, so
// get rid of empty rows from the input as if they were not there. This doesn't change
// the value of src.
if (src_e.empty()) {
src_i = src.erase_and_dispose(src_i, current_deleter<value_type>());
src_i = src.erase_and_dispose(src_i, current_deleter<rows_entry>());
continue;
}
auto i = dst.lower_bound(src_e);
if (i == dst.end() || dst.key_comp()(src_e, *i)) {
auto i = dst.lower_bound(src_e, cmp);
if (i == dst.end() || cmp(src_e, *i)) {
// Construct neutral entry which will represent missing dst entry for revert.
value_type* empty_e = current_allocator().construct<value_type>(src_e.key());
rows_entry* empty_e = current_allocator().construct<rows_entry>(src_e.key());
[&] () noexcept {
src_i = src.erase(src_i);
src_i = src.insert_before(src_i, *empty_e);
dst.insert_before(i, src_e);
}();
} else {
apply(*i, src_e);
i->apply_reversibly(s, src_e);
}
++src_i;
}
return defer([&dst, &src, revert] { revert_intrusive_set(dst, src, revert); });
return defer([&s, &dst, &src] { revert_intrusive_set(s, dst, src); });
} catch (...) {
revert_intrusive_set_range(dst, src, src.begin(), src_i, revert);
revert_intrusive_set_range(s, dst, src, src.begin(), src_i);
throw;
}
}
@@ -239,7 +233,7 @@ auto apply_reversibly_intrusive_set(Container& dst, Container& src, Apply&& appl
mutation_partition::mutation_partition(const mutation_partition& x)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
, _rows(x._rows.value_comp())
, _rows()
, _row_tombstones(x._row_tombstones) {
auto cloner = [] (const auto& x) {
return current_allocator().construct<std::remove_const_t<std::remove_reference_t<decltype(x)>>>(x);
@@ -251,12 +245,12 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
query::clustering_key_filter_ranges ck_ranges)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
, _rows(x._rows.value_comp())
, _rows()
, _row_tombstones(x._row_tombstones) {
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
_rows.push_back(*current_allocator().construct<rows_entry>(e));
_rows.insert(_rows.end(), *current_allocator().construct<rows_entry>(e), rows_entry::compare(schema));
}
}
} catch (...) {
@@ -352,9 +346,7 @@ mutation_partition::apply(const schema& s, mutation_partition&& p) {
_static_row.revert(s, column_kind::static_column, p._static_row);
});
auto revert_rows = apply_reversibly_intrusive_set(_rows, p._rows,
[&s] (rows_entry& dst, rows_entry& src) { dst.apply_reversibly(s, src); },
[&s] (rows_entry& dst, rows_entry& src) noexcept { dst.revert(s, src); });
auto revert_rows = apply_reversibly_intrusive_set(s, _rows, p._rows);
_tombstone.apply(p._tombstone); // noexcept
@@ -457,17 +449,17 @@ mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::
void mutation_partition::insert_row(const schema& s, const clustering_key& key, deletable_row&& row) {
auto e = current_allocator().construct<rows_entry>(key, std::move(row));
_rows.insert(_rows.end(), *e);
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
auto e = current_allocator().construct<rows_entry>(key, row);
_rows.insert(_rows.end(), *e);
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
}
const row*
mutation_partition::find_row(const clustering_key& key) const {
auto i = _rows.find(key);
mutation_partition::find_row(const schema& s, const clustering_key& key) const {
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
return nullptr;
}
@@ -475,22 +467,22 @@ mutation_partition::find_row(const clustering_key& key) const {
}
deletable_row&
mutation_partition::clustered_row(clustering_key&& key) {
auto i = _rows.find(key);
mutation_partition::clustered_row(const schema& s, clustering_key&& key) {
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(std::move(key));
_rows.insert(i, *e);
_rows.insert(i, *e, rows_entry::compare(s));
return e->row();
}
return i->row();
}
deletable_row&
mutation_partition::clustered_row(const clustering_key& key) {
auto i = _rows.find(key);
mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(key);
_rows.insert(i, *e);
_rows.insert(i, *e, rows_entry::compare(s));
return e->row();
}
return i->row();
@@ -501,7 +493,7 @@ mutation_partition::clustered_row(const schema& s, const clustering_key_view& ke
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct<rows_entry>(key);
_rows.insert(i, *e);
_rows.insert(i, *e, rows_entry::compare(s));
return e->row();
}
return i->row();
@@ -1302,13 +1294,10 @@ mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_t
}
rows_entry::rows_entry(rows_entry&& o) noexcept
: _key(std::move(o._key))
: _link(std::move(o._link))
, _key(std::move(o._key))
, _row(std::move(o._row))
{
using container_type = mutation_partition::rows_type;
container_type::node_algorithms::replace_node(o._link.this_ptr(), _link.this_ptr());
container_type::node_algorithms::init(o._link.this_ptr());
}
{ }
row::row(const row& o)
: _type(o._type)

View File

@@ -41,6 +41,7 @@
#include "hashing_partition_visitor.hh"
#include "range_tombstone_list.hh"
#include "clustering_key_filter.hh"
#include "intrusive_set_external_comparator.hh"
//
// Container for cells of a row. Cells are identified by column_id.
@@ -438,7 +439,7 @@ public:
};
class rows_entry {
boost::intrusive::set_member_hook<> _link;
intrusive_set_external_comparator_member_hook _link;
clustering_key _key;
deletable_row _row;
friend class mutation_partition;
@@ -534,10 +535,7 @@ class serializer;
class mutation_partition final {
public:
// FIXME: using boost::intrusive because gcc's std::set<> does not support heterogeneous lookup yet
using rows_type = boost::intrusive::set<rows_entry,
boost::intrusive::member_hook<rows_entry, boost::intrusive::set_member_hook<>, &rows_entry::_link>,
boost::intrusive::compare<rows_entry::compare>>;
using rows_type = intrusive_set_external_comparator<rows_entry, &rows_entry::_link>;
friend class rows_entry;
friend class size_calculator;
private:
@@ -555,11 +553,11 @@ private:
public:
struct copy_comparators_only {};
mutation_partition(schema_ptr s)
: _rows(rows_entry::compare(*s))
: _rows()
, _row_tombstones(*s)
{ }
mutation_partition(mutation_partition& other, copy_comparators_only)
: _rows(other._rows.key_comp())
: _rows()
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
{ }
mutation_partition(mutation_partition&&) = default;
@@ -673,8 +671,8 @@ public:
// Returns true if there is no live data or tombstones.
bool empty() const;
public:
deletable_row& clustered_row(const clustering_key& key);
deletable_row& clustered_row(clustering_key&& key);
deletable_row& clustered_row(const schema& s, const clustering_key& key);
deletable_row& clustered_row(const schema& s, clustering_key&& key);
deletable_row& clustered_row(const schema& s, const clustering_key_view& key);
public:
tombstone partition_tombstone() const { return _tombstone; }
@@ -685,7 +683,7 @@ public:
const range_tombstone_list& row_tombstones() const { return _row_tombstones; }
rows_type& clustered_rows() { return _rows; }
range_tombstone_list& row_tombstones() { return _row_tombstones; }
const row* find_row(const clustering_key& key) const;
const row* find_row(const schema& s, const clustering_key& key) const;
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;

View File

@@ -194,7 +194,7 @@ public:
return cf.find_partition_slow(schema, pkey)
.then([schema, ckey, column_name, exp] (column_family::const_mutation_partition_ptr p) {
assert(p != nullptr);
auto row = p->find_row(ckey);
auto row = p->find_row(*schema, ckey);
assert(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
assert(col_def != nullptr);

View File

@@ -465,7 +465,7 @@ public:
size_t row_count = row_count_dist(_gen);
for (size_t i = 0; i < row_count; ++i) {
auto ckey = clustering_key::from_exploded(*_schema, {random_blob(), random_blob()});
deletable_row& row = m.partition().clustered_row(ckey);
deletable_row& row = m.partition().clustered_row(*_schema, ckey);
set_random_cells(row.cells(), column_kind::regular_column);
row.marker() = random_row_marker();
}

View File

@@ -97,7 +97,7 @@ SEASTAR_TEST_CASE(test_mutation_is_applied) {
mt->apply(std::move(m));
auto p = get_partition(*mt, key);
row& r = p.clustered_row(c_key).cells();
row& r = p.clustered_row(*s, c_key).cells();
auto i = r.find_cell(r1_col.id);
BOOST_REQUIRE(i);
auto cell = i->as_atomic_cell();
@@ -734,7 +734,7 @@ SEASTAR_TEST_CASE(test_marker_apply) {
auto mutation_with_marker = [&] (row_marker rm) {
mutation m(pkey, s);
m.partition().clustered_row(ckey).marker() = rm;
m.partition().clustered_row(*s, ckey).marker() = rm;
return m;
};
@@ -743,14 +743,14 @@ SEASTAR_TEST_CASE(test_marker_apply) {
auto marker = row_marker(api::new_timestamp());
auto mm = mutation_with_marker(marker);
m.apply(mm);
BOOST_REQUIRE_EQUAL(m.partition().clustered_row(ckey).marker(), marker);
BOOST_REQUIRE_EQUAL(m.partition().clustered_row(*s, ckey).marker(), marker);
}
{
mutation m(pkey, s);
auto marker = row_marker(api::new_timestamp(), std::chrono::seconds(1), gc_clock::now());
m.apply(mutation_with_marker(marker));
BOOST_REQUIRE_EQUAL(m.partition().clustered_row(ckey).marker(), marker);
BOOST_REQUIRE_EQUAL(m.partition().clustered_row(*s, ckey).marker(), marker);
}
return make_ready_future<>();
@@ -886,7 +886,7 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
m1.set_clustered_cell(ckey1, *s->get_column_definition("v2"),
atomic_cell::make_live(2, bytes_type->decompose(data_value(bytes("v2:value2")))));
m1.partition().clustered_row(ckey2).apply(row_marker(3));
m1.partition().clustered_row(*s, ckey2).apply(row_marker(3));
m1.set_clustered_cell(ckey2, *s->get_column_definition("v2"),
atomic_cell::make_live(2, bytes_type->decompose(data_value(bytes("v2:value4")))));
map_type_impl::mutation mset1 {{}, {{int32_type->decompose(1), make_atomic_cell({})}, {int32_type->decompose(2), make_atomic_cell({})}}};
@@ -926,10 +926,10 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
auto m2_1 = m2.partition().difference(s, m1.partition());
BOOST_REQUIRE_EQUAL(m2_1.partition_tombstone(), tombstone());
BOOST_REQUIRE(!m2_1.static_row().size());
BOOST_REQUIRE(!m2_1.find_row(ckey1));
BOOST_REQUIRE(m2_1.find_row(ckey2));
BOOST_REQUIRE(m2_1.find_row(ckey2)->find_cell(2));
auto cmv = m2_1.find_row(ckey2)->find_cell(2)->as_collection_mutation();
BOOST_REQUIRE(!m2_1.find_row(*s, ckey1));
BOOST_REQUIRE(m2_1.find_row(*s, ckey2));
BOOST_REQUIRE(m2_1.find_row(*s, ckey2)->find_cell(2));
auto cmv = m2_1.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation();
auto cm = my_set_type->deserialize_mutation_form(cmv);
BOOST_REQUIRE(cm.cells.size() == 1);
BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(3));
@@ -941,12 +941,12 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
auto m1_2 = m1.partition().difference(s, m2.partition());
BOOST_REQUIRE_EQUAL(m1_2.partition_tombstone(), m12.partition().partition_tombstone());
BOOST_REQUIRE(m1_2.find_row(ckey1));
BOOST_REQUIRE(m1_2.find_row(ckey2));
BOOST_REQUIRE(!m1_2.find_row(ckey1)->find_cell(1));
BOOST_REQUIRE(!m1_2.find_row(ckey2)->find_cell(0));
BOOST_REQUIRE(!m1_2.find_row(ckey2)->find_cell(1));
cmv = m1_2.find_row(ckey2)->find_cell(2)->as_collection_mutation();
BOOST_REQUIRE(m1_2.find_row(*s, ckey1));
BOOST_REQUIRE(m1_2.find_row(*s, ckey2));
BOOST_REQUIRE(!m1_2.find_row(*s, ckey1)->find_cell(1));
BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(0));
BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(1));
cmv = m1_2.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation();
cm = my_set_type->deserialize_mutation_form(cmv);
BOOST_REQUIRE(cm.cells.size() == 1);
BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(2));
@@ -1175,7 +1175,7 @@ SEASTAR_TEST_CASE(test_mutation_upgrade) {
m.upgrade(s2);
mutation m2(pk, s2);
m2.partition().clustered_row(ckey1);
m2.partition().clustered_row(*s2, ckey1);
assert_that(m).is_equal_to(m2);
}

View File

@@ -168,7 +168,7 @@ public:
if (!m) {
*done = true;
} else {
auto row = m->partition().find_row(clustering_key::make_empty());
auto row = m->partition().find_row(*s, clustering_key::make_empty());
if (!row || row->size() != _cfg.num_columns) {
throw std::invalid_argument("Invalid sstable found. Maybe you ran write mode with different num_columns settings?");
} else {

View File

@@ -793,8 +793,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
auto verifier = [s, set_col, c_key] (auto& mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().size() == 1);
auto r = mp.find_row(c_key);
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto r = mp.find_row(*s, c_key);
BOOST_REQUIRE(r);
BOOST_REQUIRE(r->size() == 1);
auto cell = r->find_cell(set_col.id);
@@ -1123,7 +1123,7 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("jerry")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 1);
BOOST_REQUIRE(rows.calculate_size() == 1);
auto &row = rows.begin()->row();
BOOST_REQUIRE(!row.deleted_at());
auto &cells = row.cells();
@@ -1137,7 +1137,7 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("tom")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 1);
BOOST_REQUIRE(rows.calculate_size() == 1);
auto &row = rows.begin()->row();
BOOST_REQUIRE(!row.deleted_at());
auto &cells = row.cells();
@@ -1151,7 +1151,7 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("john")))));
BOOST_REQUIRE(!m->partition().partition_tombstone());
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 1);
BOOST_REQUIRE(rows.calculate_size() == 1);
auto &row = rows.begin()->row();
BOOST_REQUIRE(!row.deleted_at());
auto &cells = row.cells();
@@ -1165,7 +1165,7 @@ SEASTAR_TEST_CASE(compact) {
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, data_value(sstring("nadav")))));
BOOST_REQUIRE(m->partition().partition_tombstone());
auto &rows = m->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 0);
BOOST_REQUIRE(rows.calculate_size() == 0);
return (*reader)();
}).then([reader] (streamed_mutation_opt m) {
BOOST_REQUIRE(!m);
@@ -1403,7 +1403,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) {
auto exploded = exploded_clustering_prefix({"cl1"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(clustering);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
@@ -1440,7 +1440,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) {
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(clustering);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
return make_ready_future<>();
});
@@ -1475,7 +1475,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([sstp, s] (auto mutation) {
auto& mp = mutation->partition();
auto row = mp.clustered_row(clustering_key::make_empty());
auto row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1"))));
match_live_cell(row.cells(), *s, "cl2", data_value(data_value(to_bytes("cl2"))));
return make_ready_future<>();
@@ -1572,7 +1572,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([sstp, s, tomb] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().size() == 1);
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto c_row = *(mp.clustered_rows().begin());
BOOST_REQUIRE(c_row.row().deleted_at() == tomb);
});
@@ -2302,7 +2302,7 @@ SEASTAR_TEST_CASE(check_multi_schema) {
BOOST_REQUIRE(m);
BOOST_REQUIRE(m->key().equal(*s, partition_key::from_singular(*s, 0)));
auto& rows = m->partition().clustered_rows();
BOOST_REQUIRE_EQUAL(rows.size(), 1);
BOOST_REQUIRE_EQUAL(rows.calculate_size(), 1);
auto& row = rows.begin()->row();
BOOST_REQUIRE(!row.deleted_at());
auto& cells = row.cells();

View File

@@ -138,7 +138,7 @@ future<mutation> generate_clustered(bytes&& key) {
inline auto clustered_row(mutation& mutation, const schema& s, std::vector<bytes>&& v) {
auto exploded = exploded_clustering_prefix(std::move(v));
auto clustering_pair = clustering_key::from_clustering_prefix(s, exploded);
return mutation.partition().clustered_row(clustering_pair);
return mutation.partition().clustered_row(s, clustering_pair);
}
SEASTAR_TEST_CASE(complex_sst1_k1) {
@@ -437,7 +437,7 @@ SEASTAR_TEST_CASE(compact_storage_sparse_read) {
return mutation_from_streamed_mutation(std::move(sm));
}).then([sstp, s, &key] (auto mutation) {
auto& mp = mutation->partition();
auto row = mp.clustered_row(clustering_key::make_empty());
auto row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(to_bytes("cl1")));
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
@@ -458,7 +458,7 @@ SEASTAR_TEST_CASE(compact_storage_simple_dense_read) {
auto exploded = exploded_clustering_prefix({"cl1"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(clustering);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
@@ -478,7 +478,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) {
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(clustering);
auto row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
return make_ready_future<>();
});
@@ -505,10 +505,10 @@ SEASTAR_TEST_CASE(broken_ranges_collection) {
if (!mut) {
return stop_iteration::yes;
} else if (key_equal("127.0.0.1")) {
auto row = mut->partition().clustered_row(clustering_key::make_empty());
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
match_absent(row.cells(), *s, "tokens");
} else if (key_equal("127.0.0.3")) {
auto row = mut->partition().clustered_row(clustering_key::make_empty());
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
auto tokens = match_collection(row.cells(), *s, "tokens", tombstone(deletion_time{0x55E5F2D5, 0x051EB3FC99715Dl }));
match_collection_element<status::live>(tokens.cells[0], to_bytes("-8180144272884242102"), bytes_opt{});
} else {
@@ -601,7 +601,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) {
bound_kind::incl_end,
tombstone(1459334681228103LL, it->tomb.deletion_time))));
auto& rows = mut->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 1);
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb")));
BOOST_REQUIRE(e.row().deleted_at().timestamp == 1459334681244989LL);
@@ -677,7 +677,7 @@ SEASTAR_TEST_CASE(range_tombstone_reading) {
bound_kind::incl_end,
tombstone(1459334681228103LL, it->tomb.deletion_time))));
auto& rows = mut->partition().clustered_rows();
BOOST_REQUIRE(rows.size() == 0);
BOOST_REQUIRE(rows.calculate_size() == 0);
return stop_iteration::no;
});
});
@@ -783,7 +783,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
++it;
BOOST_REQUIRE(it == rts.end());
BOOST_REQUIRE(rows.size() == 1);
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb", "ccc")));
BOOST_REQUIRE(e.row().deleted_at().timestamp == 1459438519958850LL);

View File

@@ -1213,7 +1213,7 @@ SEASTAR_TEST_CASE(promoted_index_write) {
for (char i = 'a'; i <= 'z'; i++) {
for (char j = 'A'; j <= 'Z'; j++) {
for (int k = 0; k < 20; k++) {
auto& row = m.partition().clustered_row(
auto& row = m.partition().clustered_row(*s,
clustering_key::from_exploded(
*s, {to_bytes(sprint("%d%c%c", k, i, j))}));
row.cells().apply(*col,

View File

@@ -144,7 +144,7 @@ SEASTAR_TEST_CASE(test_fragmenting_and_freezing_streamed_mutations) {
return make_ready_future<>();
}, 1).get0();
auto expected_fragments = m.partition().clustered_rows().size()
auto expected_fragments = m.partition().clustered_rows().calculate_size()
+ m.partition().row_tombstones().size()
+ !m.partition().static_row().empty();
BOOST_REQUIRE_EQUAL(fms.size(), std::max(expected_fragments, size_t(1)));

View File

@@ -269,7 +269,7 @@ mutation trace_keyspace_helper::make_session_mutation(const one_session_records&
const session_record& record = session_records.session_rec;
auto timestamp = api::new_timestamp();
mutation m(key, schema);
auto& cells = m.partition().clustered_row(clustering_key::make_empty(*schema)).cells();
auto& cells = m.partition().clustered_row(*schema, clustering_key::make_empty(*schema)).cells();
cells.apply(*_client_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(record.client.addr()), ttl));
cells.apply(*_coordinator_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl));
@@ -306,7 +306,7 @@ mutation trace_keyspace_helper::make_slow_query_mutation(const one_session_recor
full_components.reserve(2);
full_components.emplace_back(inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()));
full_components.emplace_back(int32_type->decompose((int32_t)(engine().cpu_id())));
auto& cells = m.partition().clustered_row(clustering_key::from_exploded(*schema, full_components)).cells();
auto& cells = m.partition().clustered_row(*schema, clustering_key::from_exploded(*schema, full_components)).cells();
// the corresponding tracing session ID
cells.apply(*_slow_session_id_column, atomic_cell::make_live(timestamp, uuid_type->decompose(session_records.session_id), ttl));
@@ -364,7 +364,7 @@ mutation trace_keyspace_helper::make_event_mutation(one_session_records& session
int64_t& last_event_nanos = backend_state_ptr->last_nanos;
auto timestamp = api::new_timestamp();
mutation m(key, schema);
auto& cells = m.partition().clustered_row(clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(last_event_nanos, record.event_time_point)))).cells();
auto& cells = m.partition().clustered_row(*schema, clustering_key::from_singular(*schema, utils::UUID_gen::get_time_UUID(make_monotonic_UUID_tp(last_event_nanos, record.event_time_point)))).cells();
cells.apply(*_activity_column, atomic_cell::make_live(timestamp, utf8_type->decompose(record.message), ttl));
cells.apply(*_source_column, atomic_cell::make_live(timestamp, inet_addr_type->decompose(utils::fb_utilities::get_broadcast_address().addr()), ttl));