treewide: require type info for copying atomic_cell_or_collection
This commit is contained in:
@@ -35,12 +35,17 @@ private:
|
||||
atomic_cell_or_collection(managed_bytes&& data) : _data(std::move(data)) {}
|
||||
public:
|
||||
atomic_cell_or_collection() = default;
|
||||
atomic_cell_or_collection(atomic_cell_or_collection&&) = default;
|
||||
atomic_cell_or_collection(const atomic_cell_or_collection&) = delete;
|
||||
atomic_cell_or_collection& operator=(atomic_cell_or_collection&&) = default;
|
||||
atomic_cell_or_collection& operator=(const atomic_cell_or_collection&) = delete;
|
||||
atomic_cell_or_collection(atomic_cell ac) : _data(std::move(ac._data)) {}
|
||||
static atomic_cell_or_collection from_atomic_cell(atomic_cell data) { return { std::move(data._data) }; }
|
||||
atomic_cell_view as_atomic_cell(const column_definition&) const { return atomic_cell_view::from_bytes(_data); }
|
||||
atomic_cell_ref as_atomic_cell_ref(const column_definition&) { return { _data }; }
|
||||
atomic_cell_mutable_view as_mutable_atomic_cell(const column_definition&) { return atomic_cell_mutable_view::from_bytes(_data); }
|
||||
atomic_cell_or_collection(collection_mutation cm) : _data(std::move(cm.data)) {}
|
||||
atomic_cell_or_collection copy(const abstract_type&) const { return managed_bytes(_data); }
|
||||
explicit operator bool() const {
|
||||
return !_data.empty();
|
||||
}
|
||||
|
||||
@@ -365,7 +365,7 @@ bool cache_flat_mutation_reader::ensure_population_lower_bound() {
|
||||
rows_entry::compare less(*_schema);
|
||||
// FIXME: Avoid the copy by inserting an incomplete clustering row
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_last_row));
|
||||
current_allocator().construct<rows_entry>(*_schema, *_last_row));
|
||||
e->set_continuous(false);
|
||||
auto insert_result = rows.insert_check(rows.end(), *e, less);
|
||||
auto inserted = insert_result.second;
|
||||
@@ -418,7 +418,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
|
||||
cr.cells().prepare_hash(*_schema, column_kind::regular_column);
|
||||
}
|
||||
auto new_entry = alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(cr.key(), cr.tomb(), cr.marker(), cr.cells()));
|
||||
current_allocator().construct<rows_entry>(*_schema, cr.key(), cr.tomb(), cr.marker(), cr.cells()));
|
||||
new_entry->set_continuous(false);
|
||||
auto it = _next_row.iterators_valid() ? _next_row.get_iterator_in_latest_version()
|
||||
: mp.clustered_rows().lower_bound(cr.key(), less);
|
||||
|
||||
@@ -634,7 +634,7 @@ column_family::find_row(schema_ptr s, const dht::decorated_key& partition_key, c
|
||||
auto r = p->find_row(*s, clustering_key);
|
||||
if (r) {
|
||||
// FIXME: remove copy if only one data source
|
||||
return make_ready_future<const_row_ptr>(std::make_unique<row>(*r));
|
||||
return make_ready_future<const_row_ptr>(std::make_unique<row>(*s, column_kind::regular_column, *r));
|
||||
} else {
|
||||
return make_ready_future<const_row_ptr>();
|
||||
}
|
||||
|
||||
@@ -367,7 +367,7 @@ void view_updates::create_entry(const partition_key& base_key, const clustering_
|
||||
auto marker = compute_row_marker(update);
|
||||
r.apply(marker);
|
||||
r.apply(update.tomb());
|
||||
add_cells_to_view(*_base, *_view, update.cells(), r.cells());
|
||||
add_cells_to_view(*_base, *_view, row(*_base, column_kind::regular_column, update.cells()), r.cells());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -677,7 +677,7 @@ future<stop_iteration> view_update_builder::on_results() {
|
||||
if (tombstone && _existing && !_existing->is_end_of_partition()) {
|
||||
// We don't care if it's a range tombstone, as we're only looking for existing entries that get deleted
|
||||
if (_existing->is_clustering_row()) {
|
||||
auto& existing = _existing->as_clustering_row();
|
||||
auto existing = clustering_row(*_schema, _existing->as_clustering_row());
|
||||
auto update = clustering_row(existing.key(), row_tombstone(std::move(tombstone)), row_marker(), ::row());
|
||||
generate_update(std::move(update), { std::move(existing) });
|
||||
}
|
||||
|
||||
@@ -77,7 +77,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
|
||||
if (is_buffer_full()) {
|
||||
return stop_iteration::yes;
|
||||
}
|
||||
push_mutation_fragment(*std::exchange(_partition_end, stdx::nullopt));
|
||||
push_mutation_fragment(std::move(*std::exchange(_partition_end, stdx::nullopt)));
|
||||
return stop_iteration::no;
|
||||
}
|
||||
future<stop_iteration> consume_partition_from_source(db::timeout_clock::time_point timeout) {
|
||||
|
||||
@@ -37,9 +37,9 @@ mutation::data::data(partition_key&& key_, schema_ptr&& schema)
|
||||
{ }
|
||||
|
||||
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, const mutation_partition& mp)
|
||||
: _schema(std::move(schema))
|
||||
: _schema(schema)
|
||||
, _dk(std::move(key))
|
||||
, _p(mp)
|
||||
, _p(*schema, mp)
|
||||
{ }
|
||||
|
||||
mutation::data::data(schema_ptr&& schema, dht::decorated_key&& key, mutation_partition&& mp)
|
||||
|
||||
@@ -190,7 +190,7 @@ public:
|
||||
requires CompactedFragmentsConsumer<Consumer>
|
||||
)
|
||||
stop_iteration consume(static_row&& sr, Consumer& consumer) {
|
||||
_last_static_row = sr;
|
||||
_last_static_row = static_row(_schema, sr);
|
||||
auto current_tombstone = _range_tombstones.get_partition_tombstone();
|
||||
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column,
|
||||
row_tombstone(current_tombstone),
|
||||
|
||||
@@ -54,8 +54,10 @@ public:
|
||||
: _ck(std::move(ck)), _t(t), _marker(std::move(marker)), _cells(std::move(cells)) {
|
||||
_t.maybe_shadow(marker);
|
||||
}
|
||||
clustering_row(const rows_entry& re)
|
||||
: clustering_row(re.key(), re.row().deleted_at(), re.row().marker(), re.row().cells()) { }
|
||||
clustering_row(const schema& s, const clustering_row& other)
|
||||
: clustering_row(other._ck, other._t, other._marker, row(s, column_kind::regular_column, other._cells)) { }
|
||||
clustering_row(const schema& s, const rows_entry& re)
|
||||
: clustering_row(re.key(), re.row().deleted_at(), re.row().marker(), row(s, column_kind::regular_column, re.row().cells())) { }
|
||||
clustering_row(rows_entry&& re)
|
||||
: clustering_row(std::move(re.key()), re.row().deleted_at(), re.row().marker(), std::move(re.row().cells())) { }
|
||||
|
||||
@@ -133,7 +135,8 @@ class static_row {
|
||||
row _cells;
|
||||
public:
|
||||
static_row() = default;
|
||||
explicit static_row(const row& r) : _cells(r) { }
|
||||
static_row(const schema& s, const static_row& other) : static_row(s, other._cells) { }
|
||||
explicit static_row(const schema& s, const row& r) : _cells(s, column_kind::static_column, r) { }
|
||||
explicit static_row(row&& r) : _cells(std::move(r)) { }
|
||||
|
||||
row& cells() { return _cells; }
|
||||
@@ -328,14 +331,14 @@ public:
|
||||
mutation_fragment(partition_start&& r);
|
||||
mutation_fragment(partition_end&& r);
|
||||
|
||||
mutation_fragment(const mutation_fragment& o)
|
||||
mutation_fragment(const schema& s, const mutation_fragment& o)
|
||||
: _kind(o._kind), _data(std::make_unique<data>()) {
|
||||
switch(_kind) {
|
||||
case kind::static_row:
|
||||
new (&_data->_static_row) static_row(o._data->_static_row);
|
||||
new (&_data->_static_row) static_row(s, o._data->_static_row);
|
||||
break;
|
||||
case kind::clustering_row:
|
||||
new (&_data->_clustering_row) clustering_row(o._data->_clustering_row);
|
||||
new (&_data->_clustering_row) clustering_row(s, o._data->_clustering_row);
|
||||
break;
|
||||
case kind::range_tombstone:
|
||||
new (&_data->_range_tombstone) range_tombstone(o._data->_range_tombstone);
|
||||
@@ -349,14 +352,6 @@ public:
|
||||
}
|
||||
}
|
||||
mutation_fragment(mutation_fragment&& other) = default;
|
||||
mutation_fragment& operator=(const mutation_fragment& other) {
|
||||
if (this != &other) {
|
||||
mutation_fragment copy(other);
|
||||
this->~mutation_fragment();
|
||||
new (this) mutation_fragment(std::move(copy));
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
mutation_fragment& operator=(mutation_fragment&& other) noexcept {
|
||||
if (this != &other) {
|
||||
this->~mutation_fragment();
|
||||
|
||||
@@ -139,14 +139,14 @@ struct reversal_traits<true> {
|
||||
}
|
||||
};
|
||||
|
||||
mutation_partition::mutation_partition(const mutation_partition& x)
|
||||
mutation_partition::mutation_partition(const schema& s, const mutation_partition& x)
|
||||
: _tombstone(x._tombstone)
|
||||
, _static_row(x._static_row)
|
||||
, _static_row(s, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones) {
|
||||
auto cloner = [] (const auto& x) {
|
||||
return current_allocator().construct<std::remove_const_t<std::remove_reference_t<decltype(x)>>>(x);
|
||||
auto cloner = [&s] (const auto& x) {
|
||||
return current_allocator().construct<rows_entry>(s, x);
|
||||
};
|
||||
_rows.clone_from(x._rows, cloner, current_deleter<rows_entry>());
|
||||
}
|
||||
@@ -154,14 +154,14 @@ mutation_partition::mutation_partition(const mutation_partition& x)
|
||||
mutation_partition::mutation_partition(const mutation_partition& x, const schema& schema,
|
||||
query::clustering_key_filter_ranges ck_ranges)
|
||||
: _tombstone(x._tombstone)
|
||||
, _static_row(x._static_row)
|
||||
, _static_row(schema, column_kind::static_column, x._static_row)
|
||||
, _static_row_continuous(x._static_row_continuous)
|
||||
, _rows()
|
||||
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
|
||||
try {
|
||||
for(auto&& r : ck_ranges) {
|
||||
for (const rows_entry& e : x.range(schema, r)) {
|
||||
_rows.insert(_rows.end(), *current_allocator().construct<rows_entry>(e), rows_entry::compare(schema));
|
||||
_rows.insert(_rows.end(), *current_allocator().construct<rows_entry>(schema, e), rows_entry::compare(schema));
|
||||
}
|
||||
for (auto&& rt : x._row_tombstones.slice(schema, r)) {
|
||||
_row_tombstones.apply(schema, rt);
|
||||
@@ -210,13 +210,6 @@ mutation_partition::~mutation_partition() {
|
||||
_rows.clear_and_dispose(current_deleter<rows_entry>());
|
||||
}
|
||||
|
||||
mutation_partition&
|
||||
mutation_partition::operator=(const mutation_partition& x) {
|
||||
mutation_partition n(x);
|
||||
std::swap(*this, n);
|
||||
return *this;
|
||||
}
|
||||
|
||||
mutation_partition&
|
||||
mutation_partition::operator=(mutation_partition&& x) noexcept {
|
||||
if (this != &x) {
|
||||
@@ -257,8 +250,8 @@ struct mutation_fragment_applier {
|
||||
_mp.apply_row_tombstone(_s, std::move(rt));
|
||||
}
|
||||
|
||||
void operator()(static_row sr) {
|
||||
_mp.static_row().apply(_s, column_kind::static_column, std::move(sr.cells()));
|
||||
void operator()(const static_row& sr) {
|
||||
_mp.static_row().apply(_s, column_kind::static_column, sr.cells());
|
||||
}
|
||||
|
||||
void operator()(partition_start ps) {
|
||||
@@ -268,9 +261,10 @@ struct mutation_fragment_applier {
|
||||
void operator()(partition_end ps) {
|
||||
}
|
||||
|
||||
void operator()(clustering_row cr) {
|
||||
auto& dr = _mp.clustered_row(_s, std::move(cr.key()));
|
||||
dr.apply(_s, std::move(cr));
|
||||
void operator()(const clustering_row& cr) {
|
||||
auto temp = clustering_row(_s, cr);
|
||||
auto& dr = _mp.clustered_row(_s, std::move(temp.key()));
|
||||
dr.apply(_s, std::move(temp));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -336,7 +330,7 @@ void mutation_partition::apply_monotonically(const schema& s, mutation_partition
|
||||
if (s.version() == p_schema.version()) {
|
||||
apply_monotonically(s, std::move(p), no_cache_tracker);
|
||||
} else {
|
||||
mutation_partition p2(p);
|
||||
mutation_partition p2(s, p);
|
||||
p2.upgrade(p_schema, s);
|
||||
apply_monotonically(s, std::move(p2), no_cache_tracker);
|
||||
}
|
||||
@@ -353,7 +347,7 @@ mutation_partition::apply_weak(const schema& s, mutation_partition_view p, const
|
||||
|
||||
void mutation_partition::apply_weak(const schema& s, const mutation_partition& p, const schema& p_schema) {
|
||||
// FIXME: Optimize
|
||||
apply_monotonically(s, mutation_partition(p), p_schema);
|
||||
apply_monotonically(s, mutation_partition(s, p), p_schema);
|
||||
}
|
||||
|
||||
void mutation_partition::apply_weak(const schema& s, mutation_partition&& p) {
|
||||
@@ -457,7 +451,7 @@ void mutation_partition::insert_row(const schema& s, const clustering_key& key,
|
||||
}
|
||||
|
||||
void mutation_partition::insert_row(const schema& s, const clustering_key& key, const deletable_row& row) {
|
||||
auto e = current_allocator().construct<rows_entry>(key, row);
|
||||
auto e = current_allocator().construct<rows_entry>(s, key, row);
|
||||
_rows.insert(_rows.end(), *e, rows_entry::compare(s));
|
||||
}
|
||||
|
||||
@@ -1062,7 +1056,7 @@ apply_monotonically(const column_definition& def, cell_and_hash& dst,
|
||||
|
||||
void
|
||||
row::apply(const column_definition& column, const atomic_cell_or_collection& value, cell_hash_opt hash) {
|
||||
auto tmp = value;
|
||||
auto tmp = value.copy(*column.type);
|
||||
apply_monotonically(column, std::move(tmp), std::move(hash));
|
||||
}
|
||||
|
||||
@@ -1396,15 +1390,24 @@ rows_entry::rows_entry(rows_entry&& o) noexcept
|
||||
}
|
||||
}
|
||||
|
||||
row::row(const row& o)
|
||||
row::row(const schema& s, column_kind kind, const row& o)
|
||||
: _type(o._type)
|
||||
, _size(o._size)
|
||||
{
|
||||
if (_type == storage_type::vector) {
|
||||
new (&_storage.vector) vector_storage(o._storage.vector);
|
||||
auto& other_vec = o._storage.vector;
|
||||
auto& vec = *new (&_storage.vector) vector_storage;
|
||||
vec.present = other_vec.present;
|
||||
vec.v.reserve(other_vec.v.size());
|
||||
column_id id = 0;
|
||||
for (auto& cell : other_vec.v) {
|
||||
auto& cdef = s.column_at(kind, id++);
|
||||
vec.v.emplace_back(cell_and_hash { cell.cell.copy(*cdef.type), cell.hash });
|
||||
}
|
||||
} else {
|
||||
auto cloner = [] (const auto& x) {
|
||||
return current_allocator().construct<std::remove_const_t<std::remove_reference_t<decltype(x)>>>(x);
|
||||
auto cloner = [&] (const auto& x) {
|
||||
auto& cdef = s.column_at(kind, x.id());
|
||||
return current_allocator().construct<cell_entry>(*cdef.type, x);
|
||||
};
|
||||
new (&_storage.set) map_type;
|
||||
try {
|
||||
@@ -1425,9 +1428,9 @@ row::~row() {
|
||||
}
|
||||
}
|
||||
|
||||
row::cell_entry::cell_entry(const cell_entry& o)
|
||||
row::cell_entry::cell_entry(const abstract_type& type, const cell_entry& o)
|
||||
: _id(o._id)
|
||||
, _cell_and_hash(o._cell_and_hash)
|
||||
, _cell_and_hash{ o._cell_and_hash.cell.copy(type), o._cell_and_hash.hash }
|
||||
{ }
|
||||
|
||||
row::cell_entry::cell_entry(cell_entry&& o) noexcept
|
||||
@@ -1668,7 +1671,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
|
||||
}
|
||||
auto& cdef = s.column_at(kind, c.first);
|
||||
if (it == other_range.end() || it->first != c.first) {
|
||||
r.append_cell(c.first, c.second);
|
||||
r.append_cell(c.first, c.second.copy(*cdef.type));
|
||||
} else if (cdef.is_counter()) {
|
||||
auto cell = counter_cell_view::difference(c.second.as_atomic_cell(cdef), it->second.as_atomic_cell(cdef));
|
||||
if (cell) {
|
||||
@@ -1676,7 +1679,7 @@ row row::difference(const schema& s, column_kind kind, const row& other) const
|
||||
}
|
||||
} else if (s.column_at(kind, c.first).is_atomic()) {
|
||||
if (compare_atomic_cell_for_merge(c.second.as_atomic_cell(cdef), it->second.as_atomic_cell(cdef)) > 0) {
|
||||
r.append_cell(c.first, c.second);
|
||||
r.append_cell(c.first, c.second.copy(*cdef.type));
|
||||
}
|
||||
} else {
|
||||
auto ct = static_pointer_cast<const collection_type_impl>(s.column_at(kind, c.first).type);
|
||||
|
||||
@@ -104,7 +104,7 @@ class row {
|
||||
: _id(id)
|
||||
{ }
|
||||
cell_entry(cell_entry&&) noexcept;
|
||||
cell_entry(const cell_entry&);
|
||||
cell_entry(const abstract_type&, const cell_entry&);
|
||||
|
||||
column_id id() const { return _id; }
|
||||
const atomic_cell_or_collection& cell() const { return _cell_and_hash.cell; }
|
||||
@@ -166,7 +166,7 @@ private:
|
||||
public:
|
||||
row();
|
||||
~row();
|
||||
row(const row&);
|
||||
row(const schema&, column_kind, const row&);
|
||||
row(row&& other) noexcept;
|
||||
row& operator=(row&& other) noexcept;
|
||||
size_t size() const { return _size; }
|
||||
@@ -647,8 +647,13 @@ class deletable_row final {
|
||||
public:
|
||||
deletable_row() {}
|
||||
explicit deletable_row(clustering_row&&);
|
||||
deletable_row(row_tombstone tomb, const row_marker& marker, const row& cells)
|
||||
: _deleted_at(tomb), _marker(marker), _cells(cells)
|
||||
deletable_row(const schema& s, const deletable_row& other)
|
||||
: _deleted_at(other._deleted_at)
|
||||
, _marker(other._marker)
|
||||
, _cells(s, column_kind::regular_column, other._cells)
|
||||
{ }
|
||||
deletable_row(const schema& s, row_tombstone tomb, const row_marker& marker, const row& cells)
|
||||
: _deleted_at(tomb), _marker(marker), _cells(s, column_kind::regular_column, cells)
|
||||
{}
|
||||
|
||||
void apply(const schema&, clustering_row);
|
||||
@@ -737,16 +742,16 @@ public:
|
||||
rows_entry(const clustering_key& key, deletable_row&& row)
|
||||
: _key(key), _row(std::move(row))
|
||||
{ }
|
||||
rows_entry(const clustering_key& key, const deletable_row& row)
|
||||
: _key(key), _row(row)
|
||||
rows_entry(const schema& s, const clustering_key& key, const deletable_row& row)
|
||||
: _key(key), _row(s, row)
|
||||
{ }
|
||||
rows_entry(const clustering_key& key, row_tombstone tomb, const row_marker& marker, const row& row)
|
||||
: _key(key), _row(tomb, marker, row)
|
||||
rows_entry(const schema& s, const clustering_key& key, row_tombstone tomb, const row_marker& marker, const row& row)
|
||||
: _key(key), _row(s, tomb, marker, row)
|
||||
{ }
|
||||
rows_entry(rows_entry&& o) noexcept;
|
||||
rows_entry(const rows_entry& e)
|
||||
rows_entry(const schema& s, const rows_entry& e)
|
||||
: _key(e._key)
|
||||
, _row(e._row)
|
||||
, _row(s, e._row)
|
||||
, _flags(e._flags)
|
||||
{ }
|
||||
// Valid only if !dummy()
|
||||
@@ -911,12 +916,11 @@ public:
|
||||
, _row_tombstones(other._row_tombstones, range_tombstone_list::copy_comparator_only())
|
||||
{ }
|
||||
mutation_partition(mutation_partition&&) = default;
|
||||
mutation_partition(const mutation_partition&);
|
||||
mutation_partition(const schema& s, const mutation_partition&);
|
||||
mutation_partition(const mutation_partition&, const schema&, query::clustering_key_filter_ranges);
|
||||
mutation_partition(mutation_partition&&, const schema&, query::clustering_key_filter_ranges);
|
||||
~mutation_partition();
|
||||
static mutation_partition& container_of(rows_type&);
|
||||
mutation_partition& operator=(const mutation_partition& x);
|
||||
mutation_partition& operator=(mutation_partition&& x) noexcept;
|
||||
bool equal(const schema&, const mutation_partition&) const;
|
||||
bool equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const;
|
||||
|
||||
@@ -421,7 +421,7 @@ future<mutation_reader_merger::mutation_fragment_batch> mutation_reader_merger::
|
||||
while (!_reader_heap.empty() && key(_fragment_heap).equal(*_schema, key(_reader_heap)));
|
||||
if (_fragment_heap.size() == 1) {
|
||||
_single_reader = { _fragment_heap.back().reader, mutation_fragment::kind::partition_start };
|
||||
_current.emplace_back(_fragment_heap.back().fragment);
|
||||
_current.emplace_back(std::move(_fragment_heap.back().fragment));
|
||||
_fragment_heap.clear();
|
||||
return make_ready_future<mutation_fragment_batch>(_current);
|
||||
}
|
||||
@@ -951,7 +951,7 @@ future<> foreign_reader::fill_buffer(db::timeout_clock::time_point timeout) {
|
||||
_end_of_stream = end_of_steam;
|
||||
for (const auto& mf : *buffer) {
|
||||
// Need a copy since the mf is on the remote shard.
|
||||
push_mutation_fragment(mf);
|
||||
push_mutation_fragment(mutation_fragment(*_schema, mf));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -212,7 +212,7 @@ class partition_snapshot_flat_reader : public flat_mutation_reader::impl, public
|
||||
if (_digest_requested) {
|
||||
e.row().cells().prepare_hash(_schema, column_kind::regular_column);
|
||||
}
|
||||
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), e);
|
||||
auto result = mutation_fragment(mutation_fragment::clustering_row_tag_t(), _schema, e);
|
||||
while (has_more_rows() && _eq(peek_row().position(), result.as_clustering_row().position())) {
|
||||
const rows_entry& e = pop_clustering_row();
|
||||
if (_digest_requested) {
|
||||
|
||||
@@ -361,7 +361,7 @@ public:
|
||||
if (digest_requested) {
|
||||
row->row().cells().prepare_hash(_schema, column_kind::regular_column);
|
||||
}
|
||||
auto mf = mutation_fragment(clustering_row(*row));
|
||||
auto mf = mutation_fragment(clustering_row(_schema, *row));
|
||||
auto& cr = mf.as_mutable_clustering_row();
|
||||
for (++it; it != _current_row.end(); ++it) {
|
||||
cr.apply(_schema, *it->it);
|
||||
@@ -377,7 +377,7 @@ public:
|
||||
if (v.unique_owner) {
|
||||
consumer(std::move(v.it->row()));
|
||||
} else {
|
||||
consumer(deletable_row(v.it->row()));
|
||||
consumer(deletable_row(_schema, v.it->row()));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -413,7 +413,7 @@ public:
|
||||
} else {
|
||||
// Copy row from older version because rows in evictable versions must
|
||||
// hold values which are independently complete to be consistent on eviction.
|
||||
auto e = current_allocator().construct<rows_entry>(*_current_row[0].it);
|
||||
auto e = current_allocator().construct<rows_entry>(_schema, *_current_row[0].it);
|
||||
e->set_continuous(latest_i != rows.end() && latest_i->continuous());
|
||||
_snp.tracker()->insert(*e);
|
||||
rows.insert_before(latest_i, *e);
|
||||
|
||||
@@ -108,14 +108,14 @@ concept bool Reducer() {
|
||||
// the version chain starting from v.
|
||||
// |map| extracts the part from each version.
|
||||
// |reduce| Combines parts from the two versions.
|
||||
template <typename Result, typename Map, typename Reduce>
|
||||
template <typename Result, typename Map, typename Initial, typename Reduce>
|
||||
GCC6_CONCEPT(
|
||||
requires Mapper<Map, mutation_partition, Result>() && Reducer<Reduce, Result>()
|
||||
)
|
||||
inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduce) {
|
||||
inline Result squashed(const partition_version_ref& v, Map&& map, Initial&& initial, Reduce&& reduce) {
|
||||
const partition_version* this_v = &*v;
|
||||
partition_version* it = v->last();
|
||||
Result r = map(it->partition());
|
||||
Result r = initial(map(it->partition()));
|
||||
while (it != this_v) {
|
||||
it = it->prev();
|
||||
reduce(r, map(it->partition()));
|
||||
@@ -123,6 +123,16 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
|
||||
return r;
|
||||
}
|
||||
|
||||
template <typename Result, typename Map, typename Reduce>
|
||||
GCC6_CONCEPT(
|
||||
requires Mapper<Map, mutation_partition, Result>() && Reducer<Reduce, Result>()
|
||||
)
|
||||
inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduce) {
|
||||
return squashed<Result>(v, map,
|
||||
[] (auto&& o) -> decltype(auto) { return std::forward<decltype(o)>(o); },
|
||||
reduce);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
::static_row partition_snapshot::static_row(bool digest_requested) const {
|
||||
@@ -133,6 +143,7 @@ inline Result squashed(const partition_version_ref& v, Map&& map, Reduce&& reduc
|
||||
}
|
||||
return mp.static_row();
|
||||
},
|
||||
[this] (const row& r) { return row(*_schema, column_kind::static_column, r); },
|
||||
[this] (row& a, const row& b) { a.apply(*_schema, column_kind::static_column, b); }));
|
||||
}
|
||||
|
||||
@@ -149,6 +160,7 @@ tombstone partition_snapshot::partition_tombstone() const {
|
||||
mutation_partition partition_snapshot::squashed() const {
|
||||
return ::squashed<mutation_partition>(version(),
|
||||
[] (const mutation_partition& mp) -> const mutation_partition& { return mp; },
|
||||
[this] (const mutation_partition& mp) { return mutation_partition(*_schema, mp); },
|
||||
[this] (mutation_partition& a, const mutation_partition& b) { a.apply(*_schema, b, *_schema); });
|
||||
}
|
||||
|
||||
@@ -222,7 +234,7 @@ partition_entry partition_entry::make_evictable(const schema& s, mutation_partit
|
||||
}
|
||||
|
||||
partition_entry partition_entry::make_evictable(const schema& s, const mutation_partition& mp) {
|
||||
return make_evictable(s, mutation_partition(mp));
|
||||
return make_evictable(s, mutation_partition(s, mp));
|
||||
}
|
||||
|
||||
partition_entry::~partition_entry() {
|
||||
@@ -300,7 +312,7 @@ partition_version& partition_entry::add_version(const schema& s, cache_tracker*
|
||||
|
||||
void partition_entry::apply(const schema& s, const mutation_partition& mp, const schema& mp_schema)
|
||||
{
|
||||
apply(s, mutation_partition(mp), mp_schema);
|
||||
apply(s, mutation_partition(s, mp), mp_schema);
|
||||
}
|
||||
|
||||
void partition_entry::apply(const schema& s, mutation_partition&& mp, const schema& mp_schema)
|
||||
@@ -385,7 +397,7 @@ public:
|
||||
// due to the fact that all rows are continuous.
|
||||
for (version& v : _current_row) {
|
||||
if (!v.can_move) {
|
||||
consumer(deletable_row(v.current_row->row()));
|
||||
consumer(deletable_row(_schema, v.current_row->row()));
|
||||
} else {
|
||||
consumer(std::move(v.current_row->row()));
|
||||
}
|
||||
@@ -541,7 +553,7 @@ mutation_partition partition_entry::squashed(schema_ptr from, schema_ptr to)
|
||||
mutation_partition mp(to);
|
||||
mp.set_static_row_continuous(_version->partition().static_row_continuous());
|
||||
for (auto&& v : _version->all_elements()) {
|
||||
auto older = v.partition();
|
||||
auto older = mutation_partition(*from, v.partition());
|
||||
if (from->version() != to->version()) {
|
||||
older.upgrade(*from, *to);
|
||||
}
|
||||
|
||||
@@ -100,7 +100,7 @@ public:
|
||||
cache_entry(schema_ptr s, const dht::decorated_key& key, const mutation_partition& p)
|
||||
: _schema(std::move(s))
|
||||
, _key(key)
|
||||
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(p)))
|
||||
, _pe(partition_entry::make_evictable(*_schema, mutation_partition(*_schema, p)))
|
||||
{ }
|
||||
|
||||
cache_entry(schema_ptr s, dht::decorated_key&& key, mutation_partition&& p)
|
||||
|
||||
@@ -2257,7 +2257,7 @@ private:
|
||||
|
||||
static primary_key get_last_reconciled_row(const schema& s, const mutation_and_live_row_count& m_a_rc, const query::read_command& cmd, uint32_t limit, bool is_reversed) {
|
||||
const auto& m = m_a_rc.mut;
|
||||
auto mp = m.partition();
|
||||
auto mp = mutation_partition(s, m.partition());
|
||||
auto&& ranges = cmd.slice.row_ranges(s, m.key());
|
||||
mp.compact_for_query(s, cmd.timestamp, ranges, is_reversed, limit);
|
||||
|
||||
@@ -2516,7 +2516,7 @@ public:
|
||||
for (const version& v : z.get<0>()) {
|
||||
auto diff = v.par
|
||||
? m.partition().difference(schema, v.par->mut().unfreeze(schema).partition())
|
||||
: m.partition();
|
||||
: mutation_partition(*schema, m.partition());
|
||||
auto it = _diffs[m.token()].find(v.from);
|
||||
std::experimental::optional<mutation> mdiff;
|
||||
if (!diff.empty()) {
|
||||
|
||||
@@ -133,7 +133,7 @@ SEASTAR_TEST_CASE(test_disjoint_mutations) {
|
||||
});
|
||||
|
||||
auto m3 = mutation(s, partition_key::from_single_value(*s, to_bytes("1")));
|
||||
m3.partition() = m1.partition();
|
||||
m3.partition() = mutation_partition(*s, m1.partition());
|
||||
|
||||
auto l1 = cl.lock_cells(m1.decorated_key(), partition_cells_range(m1.partition()), no_timeout).get0();
|
||||
auto l2 = cl.lock_cells(m2.decorated_key(), partition_cells_range(m2.partition()), no_timeout).get0();
|
||||
|
||||
@@ -92,7 +92,9 @@ SEASTAR_TEST_CASE(test_apply) {
|
||||
return seastar::async([] {
|
||||
auto cdef = column_definition("name", counter_type, column_kind::regular_column);
|
||||
|
||||
auto verify_apply = [&] (atomic_cell_or_collection dst, atomic_cell_or_collection src, int64_t value) {
|
||||
auto verify_apply = [&] (const atomic_cell_or_collection& a, const atomic_cell_or_collection& b, int64_t value) {
|
||||
auto dst = a.copy(*cdef.type);
|
||||
auto src = b.copy(*cdef.type);
|
||||
counter_cell_view::apply(cdef, dst, src);
|
||||
|
||||
auto cv = counter_cell_view(dst.as_atomic_cell(cdef));
|
||||
|
||||
@@ -82,7 +82,7 @@ struct mock_consumer {
|
||||
}
|
||||
result consume_end_of_stream() {
|
||||
_result._consume_end_of_stream_called = true;
|
||||
return _result;
|
||||
return std::move(_result);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -512,7 +512,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
{
|
||||
auto rd = mt->make_flat_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -521,14 +521,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
slice.options.set<query::partition_slice::option::with_digest>();
|
||||
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = mt->make_flat_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -538,7 +538,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
{
|
||||
auto rd = mt->make_flat_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -547,14 +547,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
slice.options.set<query::partition_slice::option::with_digest>();
|
||||
auto rd = mt->make_flat_reader(s, query::full_partition_range, slice);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = mt->make_flat_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -781,7 +781,7 @@ SEASTAR_TEST_CASE(test_apply_monotonically_is_monotonic) {
|
||||
size_t fail_offset = 0;
|
||||
while (true) {
|
||||
mutation m = target;
|
||||
mutation_partition m2 = second.partition();
|
||||
auto m2 = mutation_partition(*m.schema(), second.partition());
|
||||
alloc.fail_after(fail_offset++);
|
||||
try {
|
||||
m.partition().apply_monotonically(*m.schema(), std::move(m2), no_cache_tracker);
|
||||
@@ -1293,7 +1293,7 @@ SEASTAR_TEST_CASE(test_slicing_mutation) {
|
||||
|
||||
auto test_slicing = [&] (query::clustering_row_ranges ranges, std::vector<int> expected_rows) {
|
||||
mutation_partition mp1(m.partition(), *s, ranges);
|
||||
auto mp_temp = m.partition();
|
||||
auto mp_temp = mutation_partition(*s, m.partition());
|
||||
mutation_partition mp2(std::move(mp_temp), *s, ranges);
|
||||
|
||||
BOOST_REQUIRE(mp1.equal(*s, mp2));
|
||||
|
||||
@@ -95,7 +95,7 @@ SEASTAR_TEST_CASE(test_range_tombstone_slicing) {
|
||||
m1.apply_delete(*s, rt2);
|
||||
m1.apply_delete(*s, rt3);
|
||||
|
||||
partition_entry e(m1);
|
||||
partition_entry e(mutation_partition(*s, m1));
|
||||
|
||||
auto snap = e.read(r, cleaner, s, no_cache_tracker);
|
||||
|
||||
@@ -263,7 +263,7 @@ mvcc_partition& mvcc_partition::operator+=(const mutation& m) {
|
||||
void mvcc_partition::apply(const mutation_partition& mp, schema_ptr mp_s) {
|
||||
with_allocator(region().allocator(), [&] {
|
||||
if (_evictable) {
|
||||
apply_to_evictable(partition_entry(mp), mp_s);
|
||||
apply_to_evictable(partition_entry(mutation_partition(*mp_s, mp)), mp_s);
|
||||
} else {
|
||||
logalloc::allocating_section as;
|
||||
as(region(), [&] {
|
||||
@@ -473,12 +473,12 @@ SEASTAR_TEST_CASE(test_apply_to_incomplete_respects_continuity) {
|
||||
auto before = e.squashed();
|
||||
auto e_continuity = before.get_continuity(*s);
|
||||
|
||||
auto expected_to_apply_slice = to_apply.partition();
|
||||
auto expected_to_apply_slice = mutation_partition(*s, to_apply.partition());
|
||||
if (!before.static_row_continuous()) {
|
||||
expected_to_apply_slice.static_row() = {};
|
||||
}
|
||||
|
||||
auto expected = before;
|
||||
auto expected = mutation_partition(*s, before);
|
||||
expected.apply_weak(*s, std::move(expected_to_apply_slice));
|
||||
|
||||
e += to_apply;
|
||||
@@ -565,7 +565,7 @@ SEASTAR_TEST_CASE(test_snapshot_cursor_is_consistent_with_merging_for_nonevictab
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
auto e = partition_entry(m3.partition());
|
||||
auto e = partition_entry(mutation_partition(*s, m3.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, no_cache_tracker);
|
||||
e.apply(*s, m2.partition(), *s);
|
||||
auto snap2 = e.read(r, cleaner, s, no_cache_tracker);
|
||||
@@ -609,7 +609,7 @@ SEASTAR_TEST_CASE(test_continuity_merging_in_evictable) {
|
||||
e.add_version(*s, &tracker).partition()
|
||||
.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
|
||||
|
||||
auto expected = m1.partition();
|
||||
auto expected = mutation_partition(*s, m1.partition());
|
||||
expected.clustered_row(*s, ss.make_ckey(1), is_dummy::no, is_continuous::no);
|
||||
expected.clustered_row(*s, ss.make_ckey(2), is_dummy::no, is_continuous::no);
|
||||
|
||||
@@ -811,8 +811,8 @@ SEASTAR_TEST_CASE(test_apply_is_atomic) {
|
||||
|
||||
size_t fail_offset = 0;
|
||||
while (true) {
|
||||
mutation_partition m2 = second.partition();
|
||||
auto e = partition_entry(target.partition());
|
||||
mutation_partition m2 = mutation_partition(*second.schema(), second.partition());
|
||||
auto e = partition_entry(mutation_partition(*target.schema(), target.partition()));
|
||||
//auto snap1 = e.read(r, gen.schema());
|
||||
|
||||
alloc.fail_after(fail_offset++);
|
||||
@@ -858,7 +858,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
m3.partition().make_fully_continuous();
|
||||
|
||||
{
|
||||
auto e = partition_entry(m1.partition());
|
||||
auto e = partition_entry(mutation_partition(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, nullptr);
|
||||
|
||||
{
|
||||
@@ -879,7 +879,7 @@ SEASTAR_TEST_CASE(test_versions_are_merged_when_snapshots_go_away) {
|
||||
}
|
||||
|
||||
{
|
||||
auto e = partition_entry(m1.partition());
|
||||
auto e = partition_entry(mutation_partition(*s, m1.partition()));
|
||||
auto snap1 = e.read(r, cleaner, s, nullptr);
|
||||
|
||||
{
|
||||
|
||||
@@ -97,19 +97,19 @@ PERF_TEST_F(clustering_row, make_1M)
|
||||
|
||||
PERF_TEST_F(clustering_row, copy_4)
|
||||
{
|
||||
auto mf = mutation_fragment(clustering_row_4());
|
||||
auto mf = mutation_fragment(*schema(), clustering_row_4());
|
||||
perf_tests::do_not_optimize(mf);
|
||||
}
|
||||
|
||||
PERF_TEST_F(clustering_row, copy_4k)
|
||||
{
|
||||
auto mf = mutation_fragment(clustering_row_4k());
|
||||
auto mf = mutation_fragment(*schema(), clustering_row_4k());
|
||||
perf_tests::do_not_optimize(mf);
|
||||
}
|
||||
|
||||
PERF_TEST_F(clustering_row, copy_1M)
|
||||
{
|
||||
auto mf = mutation_fragment(clustering_row_1M());
|
||||
auto mf = mutation_fragment(*schema(), clustering_row_1M());
|
||||
perf_tests::do_not_optimize(mf);
|
||||
}
|
||||
|
||||
|
||||
@@ -952,7 +952,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
|
||||
auto original_partitions = partitions_type(partition_key::less_compare(*s));
|
||||
for (int i = 0; i < partition_count / 2; i++) {
|
||||
auto m = make_new_mutation(s, i + partition_count / 2);
|
||||
original_partitions.emplace(m.key(), m.partition());
|
||||
original_partitions.emplace(m.key(), mutation_partition(*s, m.partition()));
|
||||
cache.populate(m);
|
||||
}
|
||||
|
||||
@@ -961,7 +961,7 @@ SEASTAR_TEST_CASE(test_update_failure) {
|
||||
auto updated_partitions = partitions_type(partition_key::less_compare(*s));
|
||||
for (int i = 0; i < partition_count; i++) {
|
||||
auto m = make_new_large_mutation(s, i);
|
||||
updated_partitions.emplace(m.key(), m.partition());
|
||||
updated_partitions.emplace(m.key(), mutation_partition(*s, m.partition()));
|
||||
mt->apply(m);
|
||||
}
|
||||
|
||||
@@ -3120,7 +3120,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
{
|
||||
auto rd = cache.make_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -3129,14 +3129,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
slice.options.set<query::partition_slice::option::with_digest>();
|
||||
auto rd = cache.make_reader(s, query::full_partition_range, slice);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = cache.make_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -3147,7 +3147,7 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
{
|
||||
auto rd = cache.make_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(!row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
@@ -3156,14 +3156,14 @@ SEASTAR_TEST_CASE(test_hash_is_cached) {
|
||||
slice.options.set<query::partition_slice::option::with_digest>();
|
||||
auto rd = cache.make_reader(s, query::full_partition_range, slice);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
|
||||
{
|
||||
auto rd = cache.make_reader(s);
|
||||
rd().get0()->as_partition_start();
|
||||
clustering_row row = rd().get0()->as_clustering_row();
|
||||
clustering_row row = std::move(rd().get0()->as_mutable_clustering_row());
|
||||
BOOST_REQUIRE(row.cells().cell_hash_for(0));
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1415,7 +1415,7 @@ SEASTAR_TEST_CASE(datafile_generation_37) {
|
||||
|
||||
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1")});
|
||||
|
||||
auto row = mp.clustered_row(*s, clustering);
|
||||
auto& row = mp.clustered_row(*s, clustering);
|
||||
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -1449,7 +1449,7 @@ SEASTAR_TEST_CASE(datafile_generation_38) {
|
||||
auto& mp = mutation->partition();
|
||||
auto clustering = clustering_key_prefix::from_exploded(*s, {to_bytes("cl1"), to_bytes("cl2")});
|
||||
|
||||
auto row = mp.clustered_row(*s, clustering);
|
||||
auto& row = mp.clustered_row(*s, clustering);
|
||||
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -1483,7 +1483,7 @@ SEASTAR_TEST_CASE(datafile_generation_39) {
|
||||
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
|
||||
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, rd] (auto mutation) {
|
||||
auto& mp = mutation->partition();
|
||||
auto row = mp.clustered_row(*s, clustering_key::make_empty());
|
||||
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
|
||||
match_live_cell(row.cells(), *s, "cl1", data_value(data_value(to_bytes("cl1"))));
|
||||
match_live_cell(row.cells(), *s, "cl2", data_value(data_value(to_bytes("cl2"))));
|
||||
return make_ready_future<>();
|
||||
@@ -1580,7 +1580,7 @@ SEASTAR_TEST_CASE(datafile_generation_41) {
|
||||
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, tomb, rd] (auto mutation) {
|
||||
auto& mp = mutation->partition();
|
||||
BOOST_REQUIRE(mp.clustered_rows().calculate_size() == 1);
|
||||
auto c_row = *(mp.clustered_rows().begin());
|
||||
auto& c_row = *(mp.clustered_rows().begin());
|
||||
BOOST_REQUIRE(c_row.row().deleted_at().tomb() == tomb);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -142,14 +142,14 @@ future<mutation> generate_clustered(bytes&& key) {
|
||||
inline auto clustered_row(mutation& mutation, const schema& s, std::vector<bytes>&& v) {
|
||||
auto exploded = exploded_clustering_prefix(std::move(v));
|
||||
auto clustering_pair = clustering_key::from_clustering_prefix(s, exploded);
|
||||
return mutation.partition().clustered_row(s, clustering_pair);
|
||||
return deletable_row(s, mutation.partition().clustered_row(s, clustering_pair));
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(complex_sst1_k1) {
|
||||
return generate_clustered<1>("key1").then([] (auto&& mutation) {
|
||||
auto s = complex_schema();
|
||||
|
||||
auto sr = mutation.partition().static_row();
|
||||
auto& sr = mutation.partition().static_row();
|
||||
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
|
||||
|
||||
auto row1 = clustered_row(mutation, *s, {"cl1.1", "cl2.1"});
|
||||
@@ -178,7 +178,7 @@ SEASTAR_TEST_CASE(complex_sst1_k2) {
|
||||
return generate_clustered<1>("key2").then([] (auto&& mutation) {
|
||||
auto s = complex_schema();
|
||||
|
||||
auto sr = mutation.partition().static_row();
|
||||
auto& sr = mutation.partition().static_row();
|
||||
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("static_value")));
|
||||
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{1431451390, 1431451390225257l}));
|
||||
match_collection_element<status::live>(static_set.cells[0], to_bytes("1"), bytes_opt{});
|
||||
@@ -227,7 +227,7 @@ SEASTAR_TEST_CASE(complex_sst2_k2) {
|
||||
return generate_clustered<2>("key2").then([] (auto&& mutation) {
|
||||
auto s = complex_schema();
|
||||
|
||||
auto sr = mutation.partition().static_row();
|
||||
auto& sr = mutation.partition().static_row();
|
||||
match_dead_cell(sr, *s, "static_obj");
|
||||
auto static_set = match_collection(sr, *s, "static_collection", tombstone(deletion_time{0, api::missing_timestamp}));
|
||||
match_collection_element<status::dead>(static_set.cells[0], to_bytes("1"), bytes_opt{});
|
||||
@@ -256,7 +256,7 @@ SEASTAR_TEST_CASE(complex_sst2_k3) {
|
||||
return generate_clustered<2>("key3").then([] (auto&& mutation) {
|
||||
auto s = complex_schema();
|
||||
|
||||
auto sr = mutation.partition().static_row();
|
||||
auto& sr = mutation.partition().static_row();
|
||||
match_expiring_cell(sr, *s, "static_obj", data_value(to_bytes("static_value_3")), 1431451394597062l, 1431537794);
|
||||
|
||||
auto row1 = clustered_row(mutation, *s, {"tcl1.1", "tcl2.1"});
|
||||
@@ -294,7 +294,7 @@ SEASTAR_TEST_CASE(complex_sst3_k2) {
|
||||
return generate_clustered<3>("key2").then([] (auto&& mutation) {
|
||||
auto s = complex_schema();
|
||||
|
||||
auto sr = mutation.partition().static_row();
|
||||
auto& sr = mutation.partition().static_row();
|
||||
match_live_cell(sr, *s, "static_obj", data_value(to_bytes("final_static")));
|
||||
|
||||
auto row = clustered_row(mutation, *s, {"kcl1.1", "kcl2.1"});
|
||||
@@ -451,7 +451,7 @@ SEASTAR_TEST_CASE(compact_storage_sparse_read) {
|
||||
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, &key, rd] (auto mutation) {
|
||||
BOOST_REQUIRE(mutation);
|
||||
auto& mp = mutation->partition();
|
||||
auto row = mp.clustered_row(*s, clustering_key::make_empty());
|
||||
auto& row = mp.clustered_row(*s, clustering_key::make_empty());
|
||||
match_live_cell(row.cells(), *s, "cl1", data_value(to_bytes("cl1")));
|
||||
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
|
||||
return make_ready_future<>();
|
||||
@@ -471,7 +471,7 @@ SEASTAR_TEST_CASE(compact_storage_simple_dense_read) {
|
||||
auto exploded = exploded_clustering_prefix({"cl1"});
|
||||
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
|
||||
|
||||
auto row = mp.clustered_row(*s, clustering);
|
||||
auto& row = mp.clustered_row(*s, clustering);
|
||||
match_live_cell(row.cells(), *s, "cl2", data_value(to_bytes("cl2")));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -490,7 +490,7 @@ SEASTAR_TEST_CASE(compact_storage_dense_read) {
|
||||
auto exploded = exploded_clustering_prefix({"cl1", "cl2"});
|
||||
auto clustering = clustering_key::from_clustering_prefix(*s, exploded);
|
||||
|
||||
auto row = mp.clustered_row(*s, clustering);
|
||||
auto& row = mp.clustered_row(*s, clustering);
|
||||
match_live_cell(row.cells(), *s, "cl3", data_value(to_bytes("cl3")));
|
||||
return make_ready_future<>();
|
||||
});
|
||||
@@ -515,10 +515,10 @@ SEASTAR_TEST_CASE(broken_ranges_collection) {
|
||||
if (!mut) {
|
||||
return stop_iteration::yes;
|
||||
} else if (key_equal("127.0.0.1")) {
|
||||
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
|
||||
auto& row = mut->partition().clustered_row(*s, clustering_key::make_empty());
|
||||
match_absent(row.cells(), *s, "tokens");
|
||||
} else if (key_equal("127.0.0.3")) {
|
||||
auto row = mut->partition().clustered_row(*s, clustering_key::make_empty());
|
||||
auto& row = mut->partition().clustered_row(*s, clustering_key::make_empty());
|
||||
auto tokens = match_collection(row.cells(), *s, "tokens", tombstone(deletion_time{0x55E5F2D5, 0x051EB3FC99715Dl }));
|
||||
match_collection_element<status::live>(tokens.cells[0], to_bytes("-8180144272884242102"), bytes_opt{});
|
||||
} else {
|
||||
@@ -610,7 +610,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone) {
|
||||
tombstone(1459334681228103LL, it->tomb.deletion_time))));
|
||||
auto& rows = mut->partition().clustered_rows();
|
||||
BOOST_REQUIRE(rows.calculate_size() == 1);
|
||||
for (auto e : rows) {
|
||||
for (auto& e : rows) {
|
||||
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb")));
|
||||
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459334681244989LL);
|
||||
}
|
||||
@@ -755,7 +755,7 @@ SEASTAR_TEST_CASE(tombstone_in_tombstone2) {
|
||||
BOOST_REQUIRE(it == rts.end());
|
||||
|
||||
BOOST_REQUIRE(rows.calculate_size() == 1);
|
||||
for (auto e : rows) {
|
||||
for (auto& e : rows) {
|
||||
BOOST_REQUIRE(e.key().equal(*s, make_ckey("aaa", "bbb", "ccc")));
|
||||
BOOST_REQUIRE(e.row().deleted_at().tomb().timestamp == 1459438519958850LL);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user