treewide: require type info for copying atomic_cell_or_collection

This commit is contained in:
Paweł Dziepak
2017-10-04 14:55:59 +01:00
parent e9d6fc48ac
commit 27014a23d7
26 changed files with 151 additions and 130 deletions

View File

@@ -35,12 +35,17 @@ private:
atomic_cell_or_collection(managed_bytes&& data) : _data(std::move(data)) {}
public:
atomic_cell_or_collection() = default;
atomic_cell_or_collection(atomic_cell_or_collection&&) = default;
atomic_cell_or_collection(const atomic_cell_or_collection&) = delete;
atomic_cell_or_collection& operator=(atomic_cell_or_collection&&) = default;
atomic_cell_or_collection& operator=(const atomic_cell_or_collection&) = delete;
atomic_cell_or_collection(atomic_cell ac) : _data(std::move(ac._data)) {}
static atomic_cell_or_collection from_atomic_cell(atomic_cell data) { return { std::move(data._data) }; }
atomic_cell_view as_atomic_cell(const column_definition&) const { return atomic_cell_view::from_bytes(_data); }
atomic_cell_ref as_atomic_cell_ref(const column_definition&) { return { _data }; }
atomic_cell_mutable_view as_mutable_atomic_cell(const column_definition&) { return atomic_cell_mutable_view::from_bytes(_data); }
atomic_cell_or_collection(collection_mutation cm) : _data(std::move(cm.data)) {}
atomic_cell_or_collection copy(const abstract_type&) const { return managed_bytes(_data); }
explicit operator bool() const {
return !_data.empty();
}

View File

@@ -365,7 +365,7 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
rows_entry::compare less(*_schema);
// FIXME: Avoid the copy by inserting an incomplete clustering row
auto e = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(*_last_row));
current_allocator().construct<rows_entry>(*_schema, *_last_row));
e->set_continuous(false);
auto insert_result = rows.insert_check(rows.end(), *e, less);
auto inserted = insert_result.second;
@@ -418,7 +418,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
cr.cells().prepare_hash(*_schema, column_kind::regular_column);
}
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(
current_allocator().construct<rows_entry>(cr.key(), cr.tomb(), cr.marker(), cr.cells()));
current_allocator().construct<rows_entry>(*_schema, cr.key(), cr.tomb(), cr.marker(), cr.cells()));
new_entry->set_continuous(false);
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
: mp.clustered_rows().lower_bound(cr.key(), less);

View File

@@ -634,7 +634,7 @@ column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, c
auto r = p->find_row(*s, clustering_key);
if (r) {
// FIXME: remove copy if only one data source
return make_ready_future<const_row_ptr>(std::make_unique<row>(*r));
return make_ready_future<const_row_ptr>(std::make_unique<row>(*s, column_kind::regular_column, *r));
} else {
return make_ready_future<const_row_ptr>();
}

View File

@@ -367,7 +367,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
auto marker = compute_row_marker(update);
r.apply(marker);
r.apply(update.tomb());
add_cells_to_view(*_base, *_view, update.cells(), r.cells());
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells());
}
/**
@@ -677,7 +677,7 @@ future<stop_iteration> view_update_builder::on_results() {
if (tombstone && _existing && !_existing->is_end_of_partition()) {
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
if (_existing->is_clustering_row()) {
auto& existing = _existing->as_clustering_row();
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
generate_update(std::move(update), { std::move(existing) });
}

View File

@@ -77,7 +77,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
if (is_buffer_full()) {
return stop_iteration::yes;
}
push_mutation_fragment(*std::exchange(_partition_end, stdx::nullopt));
push_mutation_fragment(std::move(*std::exchange(_partition_end, stdx::nullopt)));
return stop_iteration::no;
}
future<stop_iteration> consume_partition_from_source(db::timeout_clock::time_point timeout) {

View File

@@ -37,9 +37,9 @@ mutation::data::data(partition_key&& key_, schema_ptr&& schema)
{ }
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp)
: _schema(std::move(schema))
: _schema(schema)
, _dk(std::move(key))
, _p(mp)
, _p(*schema, mp)
{ }
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, mutation_partition&& mp)

View File

@@ -190,7 +190,7 @@ public:
requires CompactedFragmentsConsumer<Consumer>
)
stop_iteration consume(static_row&& sr, Consumer& consumer) {
_last_static_row = sr;
_last_static_row = static_row(_schema, sr);
auto current_tombstone = _range_tombstones.get_partition_tombstone();
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column,
row_tombstone(current_tombstone),

View File

@@ -54,8 +54,10 @@ public:
: _ck(std::move(ck)), _t(t), _marker(std::move(marker)), _cells(std::move(cells)) {
_t.maybe_shadow(marker);
}
clustering_row(const rows_entry& re)
: clustering_row(re.key(), re.row().deleted_at(), re.row().marker(), re.row().cells()) { }
clustering_row(const schema& s, const clustering_row& other)
: clustering_row(other._ck, other._t, other._marker, row(s, column_kind::regular_column, other._cells)) { }
clustering_row(const schema& s, const rows_entry& re)
: clustering_row(re.key(), re.row().deleted_at(), re.row().marker(), row(s, column_kind::regular_column, re.row().cells())) { }
clustering_row(rows_entry&& re)
: clustering_row(std::move(re.key()), re.row().deleted_at(), re.row().marker(), std::move(re.row().cells())) { }
@@ -133,7 +135,8 @@ class static_row {
row _cells;
public:
static_row() = default;
explicit static_row(const row& r) : _cells(r) { }
static_row(const schema& s, const static_row& other) : static_row(s, other._cells) { }
explicit static_row(const schema& s, const row& r) : _cells(s, column_kind::static_column, r) { }
explicit static_row(row&& r) : _cells(std::move(r)) { }
row& cells() { return _cells; }
@@ -328,14 +331,14 @@ public:
mutation_fragment(partition_start&& r);
mutation_fragment(partition_end&& r);
mutation_fragment(const mutation_fragment& o)
mutation_fragment(const schema& s, const mutation_fragment& o)
: _kind(o._kind), _data(std::make_unique<data>()) {
switch(_kind) {
case kind::static_row:
new (&_data->_static_row) static_row(o._data->_static_row);
new (&_data->_static_row) static_row(s, o._data->_static_row);
break;
case kind::clustering_row:
new (&_data->_clustering_row) clustering_row(o._data->_clustering_row);
new (&_data->_clustering_row) clustering_row(s, o._data->_clustering_row);
break;
case kind::range_tombstone:
new (&_data->_range_tombstone) range_tombstone(o._data->_range_tombstone);
@@ -349,14 +352,6 @@ public:
}
}
mutation_fragment(mutation_fragment&& other) = default;
mutation_fragment& operator=(const mutation_fragment& other) {
if (this != &other) {
mutation_fragment copy(other);
this->~mutation_fragment();
new (this) mutation_fragment(std::move(copy));
}
return *this;
}
mutation_fragment& operator=(mutation_fragment&& other) noexcept {
if (this != &other) {
this->~mutation_fragment();

View File

@@ -139,14 +139,14 @@ struct reversal_traits<true> {
}
};
mutation_partition::mutation_partition(const mutation_partition& x)
mutation_partition::mutation_partition(const schema& s, const mutation_partition& x)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
, _static_row(s, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones) {
auto cloner = [] (const auto& x) {
return current_allocator().construct<std::remove_const_t<std::remove_reference_t<decltype(x)>>>(x);
auto cloner = [&s] (const auto& x) {
return current_allocator().construct<rows_entry>(s, x);
};
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
}
@@ -154,14 +154,14 @@ mutation_partition::mutation_partition(const mutation_partition& x)
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
query::clustering_key_filter_ranges ck_ranges)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
, _static_row(schema, column_kind::static_column, x._static_row)
, _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
try {
for(auto&& r : ck_ranges) {
for (const rows_entry& e : x.range(schema, r)) {
_rows.insert(_rows.end(), *current_allocator().construct<rows_entry>(e), rows_entry::compare(schema));
_rows.insert(_rows.end(), *current_allocator().construct<rows_entry>(schema, e), rows_entry::compare(schema));
}
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
_row_tombstones.apply(schema, rt);
@@ -210,13 +210,6 @@ mutation_partition::~mutation_partition() {
_rows.clear_and_dispose(current_deleter<rows_entry>());
}
mutation_partition&
mutation_partition::operator=(const mutation_partition& x) {
mutation_partition n(x);
std::swap(*this, n);
return *this;
}
mutation_partition&
mutation_partition::operator=(mutation_partition&& x) noexcept {
if (this != &x) {
@@ -257,8 +250,8 @@ struct mutation_fragment_applier {
_mp.apply_row_tombstone(_s, std::move(rt));
}
void operator()(static_row sr) {
_mp.static_row().apply(_s, column_kind::static_column, std::move(sr.cells()));
void operator()(const static_row& sr) {
_mp.static_row().apply(_s, column_kind::static_column, sr.cells());
}
void operator()(partition_start ps) {
@@ -268,9 +261,10 @@ struct mutation_fragment_applier {
void operator()(partition_end ps) {
}
void operator()(clustering_row cr) {
auto& dr = _mp.clustered_row(_s, std::move(cr.key()));
dr.apply(_s, std::move(cr));
void operator()(const clustering_row& cr) {
auto temp = clustering_row(_s, cr);
auto& dr = _mp.clustered_row(_s, std::move(temp.key()));
dr.apply(_s, std::move(temp));
}
};
@@ -336,7 +330,7 @@ void mutation_partition::apply_monotonically(const schema& s, mutation_partition
if (s.version() == p_schema.version()) {
apply_monotonically(s, std::move(p), no_cache_tracker);
} else {
mutation_partition p2(p);
mutation_partition p2(s, p);
p2.upgrade(p_schema, s);
apply_monotonically(s, std::move(p2), no_cache_tracker);
}
@@ -353,7 +347,7 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p, const
void mutation_partition::apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema) {
// FIXME: Optimize
apply_monotonically(s, mutation_partition(p), p_schema);
apply_monotonically(s, mutation_partition(s, p), p_schema);
}
void mutation_partition::apply_weak(const schema& s, mutation_partition&& p) {
@@ -457,7 +451,7 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
}
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
auto e = current_allocator().construct<rows_entry>(key, row);
auto e = current_allocator().construct<rows_entry>(s, key, row);
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
}
@@ -1062,7 +1056,7 @@ apply_monotonically(const column_definition& def, cell_and_hash& dst,
void
row::apply(const column_definition& column, const atomic_cell_or_collection& value, cell_hash_opt hash) {
auto tmp = value;
auto tmp = value.copy(*column.type);
apply_monotonically(column, std::move(tmp), std::move(hash));
}
@@ -1396,15 +1390,24 @@ rows_entry::rows_entry(rows_entry&& o) noexcept
}
}
row::row(const row& o)
row::row(const schema& s, column_kind kind, const row& o)
: _type(o._type)
, _size(o._size)
{
if (_type == storage_type::vector) {
new (&_storage.vector) vector_storage(o._storage.vector);
auto& other_vec = o._storage.vector;
auto& vec = *new (&_storage.vector) vector_storage;
vec.present = other_vec.present;
vec.v.reserve(other_vec.v.size());
column_id id = 0;
for (auto& cell : other_vec.v) {
auto& cdef = s.column_at(kind, id++);
vec.v.emplace_back(cell_and_hash { cell.cell.copy(*cdef.type), cell.hash });
}
} else {
auto cloner = [] (const auto& x) {
return current_allocator().construct<std::remove_const_t<std::remove_reference_t<decltype(x)>>>(x);
auto cloner = [&] (const auto& x) {
auto& cdef = s.column_at(kind, x.id());
return current_allocator().construct<cell_entry>(*cdef.type, x);
};
new (&_storage.set) map_type;
try {
@@ -1425,9 +1428,9 @@ row::~row() {
}
}
row::cell_entry::cell_entry(const cell_entry& o)
row::cell_entry::cell_entry(const abstract_type& type, const cell_entry& o)
: _id(o._id)
, _cell_and_hash(o._cell_and_hash)
, _cell_and_hash{ o._cell_and_hash.cell.copy(type), o._cell_and_hash.hash }
{ }
row::cell_entry::cell_entry(cell_entry&& o) noexcept
@@ -1668,7 +1671,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
}
auto& cdef = s.column_at(kind, c.first);
if (it == other_range.end() || it->first != c.first) {
r.append_cell(c.first, c.second);
r.append_cell(c.first, c.second.copy(*cdef.type));
} else if (cdef.is_counter()) {
auto cell = counter_cell_view::difference(c.second.as_atomic_cell(cdef), it->second.as_atomic_cell(cdef));
if (cell) {
@@ -1676,7 +1679,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
}
} else if (s.column_at(kind, c.first).is_atomic()) {
if (compare_atomic_cell_for_merge(c.second.as_atomic_cell(cdef), it->second.as_atomic_cell(cdef)) > 0) {
r.append_cell(c.first, c.second);
r.append_cell(c.first, c.second.copy(*cdef.type));
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(s.column_at(kind, c.first).type);

View File

@@ -104,7 +104,7 @@ class row {
: _id(id)
{ }
cell_entry(cell_entry&&) noexcept;
cell_entry(const cell_entry&);
cell_entry(const abstract_type&, const cell_entry&);
column_id id() const { return _id; }
const atomic_cell_or_collection& cell() const { return _cell_and_hash.cell; }
@@ -166,7 +166,7 @@ private:
public:
row();
~row();
row(const row&);
row(const schema&, column_kind, const row&);
row(row&& other) noexcept;
row& operator=(row&& other) noexcept;
size_t size() const { return _size; }
@@ -647,8 +647,13 @@ class deletable_row final {
public:
deletable_row() {}
explicit deletable_row(clustering_row&&);
deletable_row(row_tombstone tomb, const row_marker& marker, const row& cells)
: _deleted_at(tomb), _marker(marker), _cells(cells)
deletable_row(const schema& s, const deletable_row& other)
: _deleted_at(other._deleted_at)
, _marker(other._marker)
, _cells(s, column_kind::regular_column, other._cells)
{ }
deletable_row(const schema& s, row_tombstone tomb, const row_marker& marker, const row& cells)
: _deleted_at(tomb), _marker(marker), _cells(s, column_kind::regular_column, cells)
{}
void apply(const schema&, clustering_row);
@@ -737,16 +742,16 @@ public:
rows_entry(const clustering_key& key, deletable_row&& row)
: _key(key), _row(std::move(row))
{ }
rows_entry(const clustering_key& key, const deletable_row& row)
: _key(key), _row(row)
rows_entry(const schema& s, const clustering_key& key, const deletable_row& row)
: _key(key), _row(s, row)
{ }
rows_entry(const clustering_key& key, row_tombstone tomb, const row_marker& marker, const row& row)
: _key(key), _row(tomb, marker, row)
rows_entry(const schema& s, const clustering_key& key, row_tombstone tomb, const row_marker& marker, const row& row)
: _key(key), _row(s, tomb, marker, row)
{ }
rows_entry(rows_entry&& o) noexcept;
rows_entry(const rows_entry& e)
rows_entry(const schema& s, const rows_entry& e)
: _key(e._key)
, _row(e._row)
, _row(s, e._row)
, _flags(e._flags)
{ }
// Valid only if !dummy()
@@ -911,12 +916,11 @@ public:
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
{ }
mutation_partition(mutation_partition&&) = default;
mutation_partition(const mutation_partition&);
mutation_partition(const schema& s, const mutation_partition&);
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
~mutation_partition();
static mutation_partition& container_of(rows_type&);
mutation_partition& operator=(const mutation_partition& x);
mutation_partition& operator=(mutation_partition&& x) noexcept;
bool equal(const schema&, const mutation_partition&) const;
bool equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const;

View File

@@ -421,7 +421,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
while (!_reader_heap.empty() && key(_fragment_heap).equal(*_schema, key(_reader_heap)));
if (_fragment_heap.size() == 1) {
_single_reader = { _fragment_heap.back().reader, mutation_fragment::kind::partition_start };
_current.emplace_back(_fragment_heap.back().fragment);
_current.emplace_back(std::move(_fragment_heap.back().fragment));
_fragment_heap.clear();
return make_ready_future<mutation_fragment_batch>(_current);
}
@@ -951,7 +951,7 @@ future<> foreign_reader::fill_buffer(db::timeout_clock::time_point timeout) {
_end_of_stream = end_of_steam;
for (const auto& mf : *buffer) {
// Need a copy since the mf is on the remote shard.
push_mutation_fragment(mf);
push_mutation_fragment(mutation_fragment(*_schema, mf));
}
});
}

View File

@@ -212,7 +212,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
if (_digest_requested) {
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
}
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), e);
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _schema, e);
while (has_more_rows() && _eq(peek_row().position(), result.as_clustering_row().position())) {
const rows_entry& e = pop_clustering_row();
if (_digest_requested) {

View File

@@ -361,7 +361,7 @@ public:
if (digest_requested) {
row->row().cells().prepare_hash(_schema, column_kind::regular_column);
}
auto mf = mutation_fragment(clustering_row(*row));
auto mf = mutation_fragment(clustering_row(_schema, *row));
auto& cr = mf.as_mutable_clustering_row();
for (++it; it != _current_row.end(); ++it) {
cr.apply(_schema, *it->it);
@@ -377,7 +377,7 @@ public:
if (v.unique_owner) {
consumer(std::move(v.it->row()));
} else {
consumer(deletable_row(v.it->row()));
consumer(deletable_row(_schema, v.it->row()));
}
}
}
@@ -413,7 +413,7 @@ public:
} else {
// Copy row from older version because rows in evictable versions must
// hold values which are independently complete to be consistent on eviction.
auto e = current_allocator().construct<rows_entry>(*_current_row[0].it);
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
e->set_continuous(latest_i != rows.end() && latest_i->continuous());
_snp.tracker()->insert(*e);
rows.insert_before(latest_i, *e);

View File

@@ -108,14 +108,14 @@ concept bool Reducer() {
// the version chain starting from v.
// |map| extracts the part from each version.
// |reduce| Combines parts from the two versions.
template <typename Result, typename Map, typename Reduce>
template <typename Result, typename Map, typename Initial, typename Reduce>
GCC6_CONCEPT(
requires Mapper<Map, mutation_partition, Result>() && Reducer<Reduce, Result>()
)
inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduce) {
inline Result squashed(const partition_version_ref& v, Map&& map, Initial&& initial, Reduce&& reduce) {
const partition_version* this_v = &*v;
partition_version* it = v->last();
Result r = map(it->partition());
Result r = initial(map(it->partition()));
while (it != this_v) {
it = it->prev();
reduce(r, map(it->partition()));
@@ -123,6 +123,16 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
return r;
}
template <typename Result, typename Map, typename Reduce>
GCC6_CONCEPT(
requires Mapper<Map, mutation_partition, Result>() && Reducer<Reduce, Result>()
)
inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduce) {
return squashed<Result>(v, map,
[] (auto&& o) -> decltype(auto) { return std::forward<decltype(o)>(o); },
reduce);
}
}
::static_row partition_snapshot::static_row(bool digest_requested) const {
@@ -133,6 +143,7 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
}
return mp.static_row();
},
[this] (const row& r) { return row(*_schema, column_kind::static_column, r); },
[this] (row& a, const row& b) { a.apply(*_schema, column_kind::static_column, b); }));
}
@@ -149,6 +160,7 @@ tombstone partition_snapshot::partition_tombstone() const {
mutation_partition partition_snapshot::squashed() const {
return ::squashed<mutation_partition>(version(),
[] (const mutation_partition& mp) -> const mutation_partition& { return mp; },
[this] (const mutation_partition& mp) { return mutation_partition(*_schema, mp); },
[this] (mutation_partition& a, const mutation_partition& b) { a.apply(*_schema, b, *_schema); });
}
@@ -222,7 +234,7 @@ partition_entry partition_entry::make_evictable(const schema& s, mutation_partit
}
partition_entry partition_entry::make_evictable(const schema& s, const mutation_partition& mp) {
return make_evictable(s, mutation_partition(mp));
return make_evictable(s, mutation_partition(s, mp));
}
partition_entry::~partition_entry() {
@@ -300,7 +312,7 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
void partition_entry::apply(const schema& s, const mutation_partition& mp, const schema& mp_schema)
{
apply(s, mutation_partition(mp), mp_schema);
apply(s, mutation_partition(s, mp), mp_schema);
}
void partition_entry::apply(const schema& s, mutation_partition&& mp, const schema& mp_schema)
@@ -385,7 +397,7 @@ public:
// due to the fact that all rows are continuous.
for (version& v : _current_row) {
if (!v.can_move) {
consumer(deletable_row(v.current_row->row()));
consumer(deletable_row(_schema, v.current_row->row()));
} else {
consumer(std::move(v.current_row->row()));
}
@@ -541,7 +553,7 @@ mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
mutation_partition mp(to);
mp.set_static_row_continuous(_version->partition().static_row_continuous());
for (auto&& v : _version->all_elements()) {
auto older = v.partition();
auto older = mutation_partition(*from, v.partition());
if (from->version() != to->version()) {
older.upgrade(*from, *to);
}

View File

@@ -100,7 +100,7 @@ public:
cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p)
: _schema(std::move(s))
, _key(key)
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(p)))
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(*_schema, p)))
{ }
cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p)

View File

@@ -2257,7 +2257,7 @@ private:
static primary_key get_last_reconciled_row(const schema& s, const mutation_and_live_row_count& m_a_rc, const query::read_command& cmd, uint32_t limit, bool is_reversed) {
const auto& m = m_a_rc.mut;
auto mp = m.partition();
auto mp = mutation_partition(s, m.partition());
auto&& ranges = cmd.slice.row_ranges(s, m.key());
mp.compact_for_query(s, cmd.timestamp, ranges, is_reversed, limit);
@@ -2516,7 +2516,7 @@ public:
for (const version& v : z.get<0>()) {
auto diff = v.par
? m.partition().difference(schema, v.par->mut().unfreeze(schema).partition())
: m.partition();
: mutation_partition(*schema, m.partition());
auto it = _diffs[m.token()].find(v.from);
std::experimental::optional<mutation> mdiff;
if (!diff.empty()) {

View File

@@ -133,7 +133,7 @@ SEASTAR_TEST_CASE(test_disjoint_mutations) {
});
auto m3 = mutation(s, partition_key::from_single_value(*s, to_bytes("1")));
m3.partition() = m1.partition();
m3.partition() = mutation_partition(*s, m1.partition());
auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0();
auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout).get0();

View File

@@ -92,7 +92,9 @@ SEASTAR_TEST_CASE(test_apply) {
return seastar::async([] {
auto cdef = column_definition("name", counter_type, column_kind::regular_column);
auto verify_apply = [&] (atomic_cell_or_collection dst, atomic_cell_or_collection src, int64_t value) {
auto verify_apply = [&] (const atomic_cell_or_collection& a, const atomic_cell_or_collection& b, int64_t value) {
auto dst = a.copy(*cdef.type);
auto src = b.copy(*cdef.type);
counter_cell_view::apply(cdef, dst, src);
auto cv = counter_cell_view(dst.as_atomic_cell(cdef));

View File

@@ -82,7 +82,7 @@ struct mock_consumer {
}
result consume_end_of_stream() {
_result._consume_end_of_stream_called = true;
return _result;
return std::move(_result);
}
};

View File

@@ -512,7 +512,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -521,14 +521,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -538,7 +538,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -547,14 +547,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = mt->make_flat_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});

View File

@@ -781,7 +781,7 @@ SEASTAR_TEST_CASE(test_apply_monotonically_is_monotonic) {
size_t fail_offset = 0;
while (true) {
mutation m = target;
mutation_partition m2 = second.partition();
auto m2 = mutation_partition(*m.schema(), second.partition());
alloc.fail_after(fail_offset++);
try {
m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker);
@@ -1293,7 +1293,7 @@ SEASTAR_TEST_CASE(test_slicing_mutation) {
auto test_slicing = [&] (query::clustering_row_ranges ranges, std::vector<int> expected_rows) {
mutation_partition mp1(m.partition(), *s, ranges);
auto mp_temp = m.partition();
auto mp_temp = mutation_partition(*s, m.partition());
mutation_partition mp2(std::move(mp_temp), *s, ranges);
BOOST_REQUIRE(mp1.equal(*s, mp2));

View File

@@ -95,7 +95,7 @@ SEASTAR_TEST_CASE(test_range_tombstone_slicing) {
m1.apply_delete(*s, rt2);
m1.apply_delete(*s, rt3);
partition_entry e(m1);
partition_entry e(mutation_partition(*s, m1));
auto snap = e.read(r, cleaner, s, no_cache_tracker);
@@ -263,7 +263,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) {
void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) {
with_allocator(region().allocator(), [&] {
if (_evictable) {
apply_to_evictable(partition_entry(mp), mp_s);
apply_to_evictable(partition_entry(mutation_partition(*mp_s, mp)), mp_s);
} else {
logalloc::allocating_section as;
as(region(), [&] {
@@ -473,12 +473,12 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
auto before = e.squashed();
auto e_continuity = before.get_continuity(*s);
auto expected_to_apply_slice = to_apply.partition();
auto expected_to_apply_slice = mutation_partition(*s, to_apply.partition());
if (!before.static_row_continuous()) {
expected_to_apply_slice.static_row() = {};
}
auto expected = before;
auto expected = mutation_partition(*s, before);
expected.apply_weak(*s, std::move(expected_to_apply_slice));
e += to_apply;
@@ -565,7 +565,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab
{
logalloc::reclaim_lock rl(r);
auto e = partition_entry(m3.partition());
auto e = partition_entry(mutation_partition(*s, m3.partition()));
auto snap1 = e.read(r, cleaner, s, no_cache_tracker);
e.apply(*s, m2.partition(), *s);
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
@@ -609,7 +609,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
e.add_version(*s, &tracker).partition()
.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
auto expected = m1.partition();
auto expected = mutation_partition(*s, m1.partition());
expected.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
expected.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
@@ -811,8 +811,8 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
size_t fail_offset = 0;
while (true) {
mutation_partition m2 = second.partition();
auto e = partition_entry(target.partition());
mutation_partition m2 = mutation_partition(*second.schema(), second.partition());
auto e = partition_entry(mutation_partition(*target.schema(), target.partition()));
//auto snap1 = e.read(r, gen.schema());
alloc.fail_after(fail_offset++);
@@ -858,7 +858,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
m3.partition().make_fully_continuous();
{
auto e = partition_entry(m1.partition());
auto e = partition_entry(mutation_partition(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
{
@@ -879,7 +879,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
}
{
auto e = partition_entry(m1.partition());
auto e = partition_entry(mutation_partition(*s, m1.partition()));
auto snap1 = e.read(r, cleaner, s, nullptr);
{

View File

@@ -97,19 +97,19 @@ PERF_TEST_F(clustering_row, make_1M)
PERF_TEST_F(clustering_row, copy_4)
{
auto mf = mutation_fragment(clustering_row_4());
auto mf = mutation_fragment(*schema(), clustering_row_4());
perf_tests::do_not_optimize(mf);
}
PERF_TEST_F(clustering_row, copy_4k)
{
auto mf = mutation_fragment(clustering_row_4k());
auto mf = mutation_fragment(*schema(), clustering_row_4k());
perf_tests::do_not_optimize(mf);
}
PERF_TEST_F(clustering_row, copy_1M)
{
auto mf = mutation_fragment(clustering_row_1M());
auto mf = mutation_fragment(*schema(), clustering_row_1M());
perf_tests::do_not_optimize(mf);
}

View File

@@ -952,7 +952,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
auto original_partitions = partitions_type(partition_key::less_compare(*s));
for (int i = 0; i < partition_count / 2; i++) {
auto m = make_new_mutation(s, i + partition_count / 2);
original_partitions.emplace(m.key(), m.partition());
original_partitions.emplace(m.key(), mutation_partition(*s, m.partition()));
cache.populate(m);
}
@@ -961,7 +961,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
auto updated_partitions = partitions_type(partition_key::less_compare(*s));
for (int i = 0; i < partition_count; i++) {
auto m = make_new_large_mutation(s, i);
updated_partitions.emplace(m.key(), m.partition());
updated_partitions.emplace(m.key(), mutation_partition(*s, m.partition()));
mt->apply(m);
}
@@ -3120,7 +3120,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3129,14 +3129,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
@@ -3147,7 +3147,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
}
@@ -3156,14 +3156,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
slice.options.set<query::partition_slice::option::with_digest>();
auto rd = cache.make_reader(s, query::full_partition_range, slice);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
{
auto rd = cache.make_reader(s);
rd().get0()->as_partition_start();
clustering_row row = rd().get0()->as_clustering_row();
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
BOOST_REQUIRE(row.cells().cell_hash_for(0));
}
});

View File

@@ -1415,7 +1415,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) {
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")});
auto row = mp.clustered_row(*s, clustering);
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
@@ -1449,7 +1449,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) {
auto& mp = mutation->partition();
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
auto row = mp.clustered_row(*s, clustering);
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
return make_ready_future<>();
});
@@ -1483,7 +1483,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) {
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) {
auto& mp = mutation->partition();
auto row = mp.clustered_row(*s, clustering_key::make_empty());
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1"))));
match_live_cell(row.cells(), *s, "cl2", data_value(data_value(to_bytes("cl2"))));
return make_ready_future<>();
@@ -1580,7 +1580,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) {
auto& mp = mutation->partition();
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
auto c_row = *(mp.clustered_rows().begin());
auto& c_row = *(mp.clustered_rows().begin());
BOOST_REQUIRE(c_row.row().deleted_at().tomb() == tomb);
});
});

View File

@@ -142,14 +142,14 @@ future<mutation> generate_clustered(bytes&& key) {
inline auto clustered_row(mutation& mutation, const schema& s, std::vector<bytes>&& v) {
auto exploded = exploded_clustering_prefix(std::move(v));
auto clustering_pair = clustering_key::from_clustering_prefix(s, exploded);
return mutation.partition().clustered_row(s, clustering_pair);
return deletable_row(s, mutation.partition().clustered_row(s, clustering_pair));
}
SEASTAR_TEST_CASE(complex_sst1_k1) {
return generate_clustered<1>("key1").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
auto& sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
auto row1 = clustered_row(mutation, *s, {"cl1.1", "cl2.1"});
@@ -178,7 +178,7 @@ SEASTAR_TEST_CASE(complex_sst1_k2) {
return generate_clustered<1>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
auto& sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{1431451390, 1431451390225257l}));
match_collection_element<status::live>(static_set.cells[0], to_bytes("1"), bytes_opt{});
@@ -227,7 +227,7 @@ SEASTAR_TEST_CASE(complex_sst2_k2) {
return generate_clustered<2>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
auto& sr = mutation.partition().static_row();
match_dead_cell(sr, *s, "static_obj");
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{0, api::missing_timestamp}));
match_collection_element<status::dead>(static_set.cells[0], to_bytes("1"), bytes_opt{});
@@ -256,7 +256,7 @@ SEASTAR_TEST_CASE(complex_sst2_k3) {
return generate_clustered<2>("key3").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
auto& sr = mutation.partition().static_row();
match_expiring_cell(sr, *s, "static_obj", data_value(to_bytes("static_value_3")), 1431451394597062l, 1431537794);
auto row1 = clustered_row(mutation, *s, {"tcl1.1", "tcl2.1"});
@@ -294,7 +294,7 @@ SEASTAR_TEST_CASE(complex_sst3_k2) {
return generate_clustered<3>("key2").then([] (auto&& mutation) {
auto s = complex_schema();
auto sr = mutation.partition().static_row();
auto& sr = mutation.partition().static_row();
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("final_static")));
auto row = clustered_row(mutation, *s, {"kcl1.1", "kcl2.1"});
@@ -451,7 +451,7 @@ SEASTAR_TEST_CASE(compact_storage_sparse_read) {
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
BOOST_REQUIRE(mutation);
auto& mp = mutation->partition();
auto row = mp.clustered_row(*s, clustering_key::make_empty());
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
match_live_cell(row.cells(), *s, "cl1", data_value(to_bytes("cl1")));
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
@@ -471,7 +471,7 @@ SEASTAR_TEST_CASE(compact_storage_simple_dense_read) {
auto exploded = exploded_clustering_prefix({"cl1"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(*s, clustering);
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
return make_ready_future<>();
});
@@ -490,7 +490,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) {
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
auto row = mp.clustered_row(*s, clustering);
auto& row = mp.clustered_row(*s, clustering);
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
return make_ready_future<>();
});
@@ -515,10 +515,10 @@ SEASTAR_TEST_CASE(broken_ranges_collection) {
if (!mut) {
return stop_iteration::yes;
} else if (key_equal("127.0.0.1")) {
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
auto& row = mut->partition().clustered_row(*s, clustering_key::make_empty());
match_absent(row.cells(), *s, "tokens");
} else if (key_equal("127.0.0.3")) {
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
auto& row = mut->partition().clustered_row(*s, clustering_key::make_empty());
auto tokens = match_collection(row.cells(), *s, "tokens", tombstone(deletion_time{0x55E5F2D5, 0x051EB3FC99715Dl }));
match_collection_element<status::live>(tokens.cells[0], to_bytes("-8180144272884242102"), bytes_opt{});
} else {
@@ -610,7 +610,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) {
tombstone(1459334681228103LL, it->tomb.deletion_time))));
auto& rows = mut->partition().clustered_rows();
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
for (auto& e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb")));
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459334681244989LL);
}
@@ -755,7 +755,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
BOOST_REQUIRE(it == rts.end());
BOOST_REQUIRE(rows.calculate_size() == 1);
for (auto e : rows) {
for (auto& e : rows) {
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb", "ccc")));
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459438519958850LL);
}