mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
atomic_cell: introduce fragmented buffer value interface
As a prepratation for the switch to the new cell representation this patch changes the type returned by atomic_cell_view::value() to one that requires explicit linearisation of the cell value. Even though the value is still implicitly linearised (and only when managed by the LSA) the new interface is the same as the target one so that no more changes to its users will be needed.
This commit is contained in:
126
atomic_cell.hh
126
atomic_cell.hh
@@ -48,6 +48,108 @@ T get_field(const bytes_view& v, unsigned offset) {
|
||||
|
||||
class atomic_cell_or_collection;
|
||||
|
||||
template<mutable_view is_mutable>
|
||||
class basic_atomic_cell_value_view {
|
||||
public:
|
||||
using fragment_type = std::conditional_t<is_mutable == mutable_view::no,
|
||||
bytes_view, bytes_mutable_view>;
|
||||
private:
|
||||
fragment_type _value;
|
||||
public:
|
||||
explicit basic_atomic_cell_value_view(fragment_type value) : _value(value) { }
|
||||
|
||||
class iterator {
|
||||
fragment_type _view;
|
||||
public:
|
||||
using iterator_category = std::forward_iterator_tag;
|
||||
using value_type = fragment_type;
|
||||
using pointer = const fragment_type*;
|
||||
using reference = const fragment_type&;
|
||||
using difference_type = std::ptrdiff_t;
|
||||
|
||||
explicit iterator(fragment_type fv) noexcept
|
||||
: _view(fv) { }
|
||||
|
||||
const fragment_type& operator*() const {
|
||||
return _view;
|
||||
}
|
||||
const fragment_type* operator->() const {
|
||||
return &_view;
|
||||
}
|
||||
iterator& operator++() {
|
||||
_view = { };
|
||||
return *this;
|
||||
}
|
||||
iterator operator++(int) {
|
||||
auto it = *this;
|
||||
operator++();
|
||||
return it;
|
||||
}
|
||||
|
||||
bool operator==(const iterator& other) const {
|
||||
return _view.data() == other._view.data();
|
||||
}
|
||||
bool operator!=(const iterator& other) const {
|
||||
return !(*this == other);
|
||||
}
|
||||
};
|
||||
|
||||
using const_iterator = iterator;
|
||||
|
||||
auto begin() const {
|
||||
return iterator(_value);
|
||||
}
|
||||
auto end() const {
|
||||
return iterator(fragment_type());
|
||||
}
|
||||
|
||||
bool operator==(const basic_atomic_cell_value_view& other) const noexcept {
|
||||
return _value == other._value;
|
||||
}
|
||||
bool operator==(bytes_view bv) const noexcept {
|
||||
return _value == bv;
|
||||
}
|
||||
|
||||
size_t size_bytes() const noexcept {
|
||||
return _value.size();
|
||||
}
|
||||
|
||||
bool empty() const noexcept {
|
||||
return _value.empty();
|
||||
}
|
||||
|
||||
bool is_fragmented() const noexcept {
|
||||
return false;
|
||||
}
|
||||
|
||||
fragment_type first_fragment() const noexcept {
|
||||
return _value;
|
||||
}
|
||||
|
||||
bytes linearize() const {
|
||||
return bytes(_value.begin(), _value.end());
|
||||
}
|
||||
|
||||
template<typename Function>
|
||||
decltype(auto) with_linearized(Function&& fn) const {
|
||||
return fn(_value);
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const basic_atomic_cell_value_view& vv) {
|
||||
return os << vv.first_fragment();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
using atomic_cell_value_view = basic_atomic_cell_value_view<mutable_view::no>;
|
||||
using atomic_cell_value_mutable_view = basic_atomic_cell_value_view<mutable_view::yes>;
|
||||
|
||||
inline int compare_unsigned(atomic_cell_value_view a, atomic_cell_value_view b)
|
||||
{
|
||||
assert(!a.is_fragmented() && !b.is_fragmented());
|
||||
return compare_unsigned(a.first_fragment(), b.first_fragment());
|
||||
}
|
||||
|
||||
/*
|
||||
* Represents atomic cell layout. Works on serialized form.
|
||||
*
|
||||
@@ -102,11 +204,11 @@ private:
|
||||
return cell;
|
||||
}
|
||||
public:
|
||||
static bytes_view value(bytes_view cell) {
|
||||
return do_get_value(cell);
|
||||
static atomic_cell_value_view value(bytes_view cell) {
|
||||
return atomic_cell_value_view(do_get_value(cell));
|
||||
}
|
||||
static bytes_mutable_view value(bytes_mutable_view cell) {
|
||||
return do_get_value(cell);
|
||||
static atomic_cell_value_mutable_view value(bytes_mutable_view cell) {
|
||||
return atomic_cell_value_mutable_view(do_get_value(cell));
|
||||
}
|
||||
// Can be called on live counter update cells only
|
||||
static int64_t counter_update_value(bytes_view cell) {
|
||||
@@ -232,6 +334,9 @@ public:
|
||||
auto value() const {
|
||||
return atomic_cell_type::value(_data);
|
||||
}
|
||||
bool is_value_fragmented() const {
|
||||
return false;
|
||||
}
|
||||
// Can be called on live counter update cells only
|
||||
int64_t counter_update_value() const {
|
||||
return atomic_cell_type::counter_update_value(_data);
|
||||
@@ -274,6 +379,10 @@ public:
|
||||
friend class atomic_cell;
|
||||
};
|
||||
|
||||
template<mutable_view is_mutable>
|
||||
using basic_atomic_cell_view = std::conditional_t<is_mutable == mutable_view::no,
|
||||
atomic_cell_view, atomic_cell_mutable_view>;
|
||||
|
||||
class atomic_cell_ref final : public atomic_cell_base<managed_bytes&> {
|
||||
public:
|
||||
atomic_cell_ref(managed_bytes& buf) : atomic_cell_base(buf) {}
|
||||
@@ -345,19 +454,18 @@ public:
|
||||
|
||||
class collection_mutation_view {
|
||||
public:
|
||||
bytes_view data;
|
||||
bytes_view serialize() const { return data; }
|
||||
static collection_mutation_view from_bytes(bytes_view v) { return { v }; }
|
||||
// FIXME: encapsulate properly
|
||||
atomic_cell_value_view data;
|
||||
};
|
||||
|
||||
inline
|
||||
collection_mutation::collection_mutation(collection_mutation_view v)
|
||||
: data(v.data) {
|
||||
: data(v.data.linearize()) {
|
||||
}
|
||||
|
||||
inline
|
||||
collection_mutation::operator collection_mutation_view() const {
|
||||
return { data };
|
||||
return { atomic_cell_value_view(bytes_view(data)) };
|
||||
}
|
||||
|
||||
class column_definition;
|
||||
|
||||
@@ -33,13 +33,15 @@ template<>
|
||||
struct appending_hash<collection_mutation_view> {
|
||||
template<typename Hasher>
|
||||
void operator()(Hasher& h, collection_mutation_view cell, const column_definition& cdef) const {
|
||||
cell.data.with_linearized([&] (bytes_view cell_bv) {
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(cdef.type);
|
||||
auto m_view = ctype->deserialize_mutation_form(cell);
|
||||
auto m_view = ctype->deserialize_mutation_form(cell_bv);
|
||||
::feed_hash(h, m_view.tomb);
|
||||
for (auto&& key_and_value : m_view.cells) {
|
||||
::feed_hash(h, key_and_value.first);
|
||||
::feed_hash(h, key_and_value.second, cdef);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -51,7 +53,9 @@ struct appending_hash<atomic_cell_view> {
|
||||
feed_hash(h, cell.timestamp());
|
||||
if (cell.is_live()) {
|
||||
if (cdef.is_counter()) {
|
||||
::feed_hash(h, counter_cell_view(cell));
|
||||
counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) {
|
||||
::feed_hash(h, ccv);
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (cell.is_live_and_has_ttl()) {
|
||||
@@ -91,4 +95,16 @@ struct appending_hash<atomic_cell_or_collection> {
|
||||
feed_hash(h, c.as_collection_mutation(), cdef);
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
template<>
|
||||
struct appending_hash<atomic_cell_value_view> {
|
||||
template<typename Hasher>
|
||||
void operator()(Hasher& h, atomic_cell_value_view v) const {
|
||||
using boost::range::for_each;
|
||||
feed_hash(h, v.size_bytes());
|
||||
for_each(v, [&h] (auto&& chk) {
|
||||
h.update(reinterpret_cast<const char*>(chk.data()), chk.size());
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -57,7 +57,7 @@ public:
|
||||
return std::move(data.data);
|
||||
}
|
||||
collection_mutation_view as_collection_mutation() const {
|
||||
return collection_mutation_view{_data};
|
||||
return collection_mutation_view{atomic_cell_value_view(bytes_view(_data))};
|
||||
}
|
||||
bytes_view serialize() const {
|
||||
return _data;
|
||||
|
||||
@@ -47,8 +47,9 @@ private:
|
||||
if (!is_compatible(new_def, old_type, kind)) {
|
||||
return;
|
||||
}
|
||||
cell.data.with_linearized([&] (bytes_view cell_bv) {
|
||||
auto&& ctype = static_pointer_cast<const collection_type_impl>(old_type);
|
||||
auto old_view = ctype->deserialize_mutation_form(cell);
|
||||
auto old_view = ctype->deserialize_mutation_form(cell_bv);
|
||||
|
||||
collection_type_impl::mutation_view new_view;
|
||||
if (old_view.tomb.timestamp > new_def.dropped_at()) {
|
||||
@@ -60,6 +61,7 @@ private:
|
||||
}
|
||||
}
|
||||
dst.apply(new_def, ctype->serialize_mutation_form(std::move(new_view)));
|
||||
});
|
||||
}
|
||||
public:
|
||||
converting_mutation_partition_applier(
|
||||
|
||||
37
counters.cc
37
counters.cc
@@ -78,10 +78,10 @@ std::vector<counter_shard> counter_cell_view::shards_compatible_with_1_7_4() con
|
||||
return sorted_shards;
|
||||
}
|
||||
|
||||
static bool apply_in_place(const column_definition& cdef, atomic_cell_or_collection& dst, atomic_cell_or_collection& src)
|
||||
static bool apply_in_place(const column_definition& cdef, atomic_cell_mutable_view dst, atomic_cell_mutable_view src)
|
||||
{
|
||||
auto dst_ccmv = counter_cell_mutable_view(dst.as_mutable_atomic_cell(cdef));
|
||||
auto src_ccmv = counter_cell_mutable_view(src.as_mutable_atomic_cell(cdef));
|
||||
auto dst_ccmv = counter_cell_mutable_view(dst);
|
||||
auto src_ccmv = counter_cell_mutable_view(src);
|
||||
auto dst_shards = dst_ccmv.shards();
|
||||
auto src_shards = src_ccmv.shards();
|
||||
|
||||
@@ -143,16 +143,22 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll
|
||||
|
||||
assert(!dst_ac.is_counter_update());
|
||||
assert(!src_ac.is_counter_update());
|
||||
with_linearized(dst_ac, [&] (counter_cell_view dst_ccv) {
|
||||
with_linearized(src_ac, [&] (counter_cell_view src_ccv) {
|
||||
|
||||
if (counter_cell_view(dst_ac).shard_count() >= counter_cell_view(src_ac).shard_count()
|
||||
if (dst_ccv.shard_count() >= src_ccv.shard_count()
|
||||
&& dst.can_use_mutable_view() && src.can_use_mutable_view()) {
|
||||
if (apply_in_place(cdef, dst, src)) {
|
||||
return;
|
||||
auto dst_amc = dst.as_mutable_atomic_cell(cdef);
|
||||
auto src_amc = src.as_mutable_atomic_cell(cdef);
|
||||
if (!dst_amc.is_value_fragmented() && !src_amc.is_value_fragmented()) {
|
||||
if (apply_in_place(cdef, dst_amc, src_amc)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto dst_shards = counter_cell_view(dst_ac).shards();
|
||||
auto src_shards = counter_cell_view(src_ac).shards();
|
||||
auto dst_shards = dst_ccv.shards();
|
||||
auto src_shards = src_ccv.shards();
|
||||
|
||||
counter_cell_builder result;
|
||||
combine(dst_shards.begin(), dst_shards.end(), src_shards.begin(), src_shards.end(),
|
||||
@@ -161,7 +167,9 @@ void counter_cell_view::apply(const column_definition& cdef, atomic_cell_or_coll
|
||||
});
|
||||
|
||||
auto cell = result.build(std::max(dst_ac.timestamp(), src_ac.timestamp()));
|
||||
src = std::exchange(dst, atomic_cell_or_collection(*counter_type, cell));
|
||||
src = std::exchange(dst, atomic_cell_or_collection(std::move(cell)));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, atomic_cell_view b)
|
||||
@@ -176,8 +184,10 @@ stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, at
|
||||
return { };
|
||||
}
|
||||
|
||||
auto a_shards = counter_cell_view(a).shards();
|
||||
auto b_shards = counter_cell_view(b).shards();
|
||||
return with_linearized(a, [&] (counter_cell_view a_ccv) {
|
||||
return with_linearized(b, [&] (counter_cell_view b_ccv) {
|
||||
auto a_shards = a_ccv.shards();
|
||||
auto b_shards = b_ccv.shards();
|
||||
|
||||
auto a_it = a_shards.begin();
|
||||
auto a_end = a_shards.end();
|
||||
@@ -202,6 +212,8 @@ stdx::optional<atomic_cell> counter_cell_view::difference(atomic_cell_view a, at
|
||||
diff = atomic_cell::make_live(*counter_type, a.timestamp(), bytes_view());
|
||||
}
|
||||
return diff;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -239,12 +251,13 @@ void transform_counter_updates_to_shards(mutation& m, const mutation* current_st
|
||||
if (!acv.is_live()) {
|
||||
return; // continue -- we are in lambda
|
||||
}
|
||||
counter_cell_view ccv(acv);
|
||||
counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) {
|
||||
auto cs = ccv.local_shard();
|
||||
if (!cs) {
|
||||
return; // continue
|
||||
}
|
||||
shards.emplace_back(std::make_pair(id, counter_shard(*cs)));
|
||||
});
|
||||
});
|
||||
|
||||
transformee.for_each_cell([&] (column_id id, atomic_cell_or_collection& ac_o_c) {
|
||||
|
||||
68
counters.hh
68
counters.hh
@@ -79,7 +79,7 @@ static_assert(std::is_pod<counter_id>::value, "counter_id should be a POD type")
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const counter_id& id);
|
||||
|
||||
template<typename View>
|
||||
template<mutable_view is_mutable>
|
||||
class basic_counter_shard_view {
|
||||
enum class offset : unsigned {
|
||||
id = 0u,
|
||||
@@ -88,7 +88,8 @@ class basic_counter_shard_view {
|
||||
total_size = unsigned(logical_clock) + sizeof(int64_t),
|
||||
};
|
||||
private:
|
||||
typename View::pointer _base;
|
||||
using pointer_type = std::conditional_t<is_mutable == mutable_view::no, const signed char*, signed char*>;
|
||||
pointer_type _base;
|
||||
private:
|
||||
template<typename T>
|
||||
T read(offset off) const {
|
||||
@@ -100,7 +101,7 @@ public:
|
||||
static constexpr auto size = size_t(offset::total_size);
|
||||
public:
|
||||
basic_counter_shard_view() = default;
|
||||
explicit basic_counter_shard_view(typename View::pointer ptr) noexcept
|
||||
explicit basic_counter_shard_view(pointer_type ptr) noexcept
|
||||
: _base(ptr) { }
|
||||
|
||||
counter_id id() const { return read<counter_id>(offset::id); }
|
||||
@@ -111,7 +112,7 @@ public:
|
||||
static constexpr size_t off = size_t(offset::value);
|
||||
static constexpr size_t size = size_t(offset::total_size) - off;
|
||||
|
||||
typename View::value_type tmp[size];
|
||||
signed char tmp[size];
|
||||
std::copy_n(_base + off, size, tmp);
|
||||
std::copy_n(other._base + off, size, _base + off);
|
||||
std::copy_n(tmp, size, other._base + off);
|
||||
@@ -138,7 +139,7 @@ public:
|
||||
};
|
||||
};
|
||||
|
||||
using counter_shard_view = basic_counter_shard_view<bytes_view>;
|
||||
using counter_shard_view = basic_counter_shard_view<mutable_view::no>;
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, counter_shard_view csv);
|
||||
|
||||
@@ -287,28 +288,32 @@ public:
|
||||
// <counter_id> := <int64_t><int64_t>
|
||||
// <shard> := <counter_id><int64_t:value><int64_t:logical_clock>
|
||||
// <counter_cell> := <shard>*
|
||||
template<typename View>
|
||||
template<mutable_view is_mutable>
|
||||
class basic_counter_cell_view {
|
||||
protected:
|
||||
atomic_cell_base<View> _cell;
|
||||
using linearized_value_view = std::conditional_t<is_mutable == mutable_view::no,
|
||||
bytes_view, bytes_mutable_view>;
|
||||
using pointer_type = typename linearized_value_view::pointer;
|
||||
basic_atomic_cell_view<is_mutable> _cell;
|
||||
linearized_value_view _value;
|
||||
private:
|
||||
class shard_iterator : public std::iterator<std::input_iterator_tag, basic_counter_shard_view<View>> {
|
||||
typename View::pointer _current;
|
||||
basic_counter_shard_view<View> _current_view;
|
||||
class shard_iterator : public std::iterator<std::input_iterator_tag, basic_counter_shard_view<is_mutable>> {
|
||||
pointer_type _current;
|
||||
basic_counter_shard_view<is_mutable> _current_view;
|
||||
public:
|
||||
shard_iterator() = default;
|
||||
shard_iterator(typename View::pointer ptr) noexcept
|
||||
shard_iterator(pointer_type ptr) noexcept
|
||||
: _current(ptr), _current_view(ptr) { }
|
||||
|
||||
basic_counter_shard_view<View>& operator*() noexcept {
|
||||
basic_counter_shard_view<is_mutable>& operator*() noexcept {
|
||||
return _current_view;
|
||||
}
|
||||
basic_counter_shard_view<View>* operator->() noexcept {
|
||||
basic_counter_shard_view<is_mutable>* operator->() noexcept {
|
||||
return &_current_view;
|
||||
}
|
||||
shard_iterator& operator++() noexcept {
|
||||
_current += counter_shard_view::size;
|
||||
_current_view = basic_counter_shard_view<View>(_current);
|
||||
_current_view = basic_counter_shard_view<is_mutable>(_current);
|
||||
return *this;
|
||||
}
|
||||
shard_iterator operator++(int) noexcept {
|
||||
@@ -318,7 +323,7 @@ private:
|
||||
}
|
||||
shard_iterator& operator--() noexcept {
|
||||
_current -= counter_shard_view::size;
|
||||
_current_view = basic_counter_shard_view<View>(_current);
|
||||
_current_view = basic_counter_shard_view<is_mutable>(_current);
|
||||
return *this;
|
||||
}
|
||||
shard_iterator operator--(int) noexcept {
|
||||
@@ -335,22 +340,23 @@ private:
|
||||
};
|
||||
public:
|
||||
boost::iterator_range<shard_iterator> shards() const {
|
||||
auto bv = _cell.value();
|
||||
auto begin = shard_iterator(bv.data());
|
||||
auto end = shard_iterator(bv.data() + bv.size());
|
||||
auto begin = shard_iterator(_value.data());
|
||||
auto end = shard_iterator(_value.data() + _value.size());
|
||||
return boost::make_iterator_range(begin, end);
|
||||
}
|
||||
|
||||
size_t shard_count() const {
|
||||
return _cell.value().size() / counter_shard_view::size;
|
||||
return _cell.value().size_bytes() / counter_shard_view::size;
|
||||
}
|
||||
public:
|
||||
protected:
|
||||
// ac must be a live counter cell
|
||||
explicit basic_counter_cell_view(atomic_cell_base<View> ac) noexcept : _cell(ac) {
|
||||
explicit basic_counter_cell_view(basic_atomic_cell_view<is_mutable> ac, linearized_value_view vv) noexcept
|
||||
: _cell(ac), _value(vv)
|
||||
{
|
||||
assert(_cell.is_live());
|
||||
assert(!_cell.is_counter_update());
|
||||
}
|
||||
|
||||
public:
|
||||
api::timestamp_type timestamp() const { return _cell.timestamp(); }
|
||||
|
||||
static data_type total_value_type() { return long_type; }
|
||||
@@ -381,9 +387,17 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
struct counter_cell_view : basic_counter_cell_view<bytes_view> {
|
||||
struct counter_cell_view : basic_counter_cell_view<mutable_view::no> {
|
||||
using basic_counter_cell_view::basic_counter_cell_view;
|
||||
|
||||
template<typename Function>
|
||||
static decltype(auto) with_linearized(basic_atomic_cell_view<mutable_view::no> ac, Function&& fn) {
|
||||
return ac.value().with_linearized([&] (bytes_view value_view) {
|
||||
counter_cell_view ccv(ac, value_view);
|
||||
return fn(ccv);
|
||||
});
|
||||
}
|
||||
|
||||
// Returns counter shards in an order that is compatible with Scylla 1.7.4.
|
||||
std::vector<counter_shard> shards_compatible_with_1_7_4() const;
|
||||
|
||||
@@ -397,9 +411,15 @@ struct counter_cell_view : basic_counter_cell_view<bytes_view> {
|
||||
friend std::ostream& operator<<(std::ostream& os, counter_cell_view ccv);
|
||||
};
|
||||
|
||||
struct counter_cell_mutable_view : basic_counter_cell_view<bytes_mutable_view> {
|
||||
struct counter_cell_mutable_view : basic_counter_cell_view<mutable_view::yes> {
|
||||
using basic_counter_cell_view::basic_counter_cell_view;
|
||||
|
||||
explicit counter_cell_mutable_view(atomic_cell_mutable_view ac) noexcept
|
||||
: basic_counter_cell_view<mutable_view::yes>(ac, ac.value().first_fragment())
|
||||
{
|
||||
assert(!ac.value().is_fragmented());
|
||||
}
|
||||
|
||||
void set_timestamp(api::timestamp_type ts) { _cell.set_timestamp(ts); }
|
||||
};
|
||||
|
||||
|
||||
@@ -113,7 +113,7 @@ public:
|
||||
class contains;
|
||||
|
||||
protected:
|
||||
bytes_view_opt get_value(const schema& schema,
|
||||
std::optional<atomic_cell_value_view> get_value(const schema& schema,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
const row& cells,
|
||||
|
||||
@@ -430,7 +430,7 @@ void statement_restrictions::validate_secondary_index_selections(bool selects_on
|
||||
}
|
||||
}
|
||||
|
||||
static bytes_view_opt do_get_value(const schema& schema,
|
||||
static std::optional<atomic_cell_value_view> do_get_value(const schema& schema,
|
||||
const column_definition& cdef,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
@@ -438,21 +438,21 @@ static bytes_view_opt do_get_value(const schema& schema,
|
||||
gc_clock::time_point now) {
|
||||
switch(cdef.kind) {
|
||||
case column_kind::partition_key:
|
||||
return key.get_component(schema, cdef.component_index());
|
||||
return atomic_cell_value_view(key.get_component(schema, cdef.component_index()));
|
||||
case column_kind::clustering_key:
|
||||
return ckey.get_component(schema, cdef.component_index());
|
||||
return atomic_cell_value_view(ckey.get_component(schema, cdef.component_index()));
|
||||
default:
|
||||
auto cell = cells.find_cell(cdef.id);
|
||||
if (!cell) {
|
||||
return stdx::nullopt;
|
||||
return std::nullopt;
|
||||
}
|
||||
assert(cdef.is_atomic());
|
||||
auto c = cell->as_atomic_cell(cdef);
|
||||
return c.is_dead(now) ? stdx::nullopt : bytes_view_opt(c.value());
|
||||
return c.is_dead(now) ? std::nullopt : std::optional<atomic_cell_value_view>(c.value());
|
||||
}
|
||||
}
|
||||
|
||||
bytes_view_opt single_column_restriction::get_value(const schema& schema,
|
||||
std::optional<atomic_cell_value_view> single_column_restriction::get_value(const schema& schema,
|
||||
const partition_key& key,
|
||||
const clustering_key_prefix& ckey,
|
||||
const row& cells,
|
||||
@@ -472,7 +472,12 @@ bool single_column_restriction::EQ::is_satisfied_by(const schema& schema,
|
||||
auto operand = value(options);
|
||||
if (operand) {
|
||||
auto cell_value = get_value(schema, key, ckey, cells, now);
|
||||
return cell_value && _column_def.type->compare(*operand, *cell_value) == 0;
|
||||
if (!cell_value) {
|
||||
return false;
|
||||
}
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return _column_def.type->compare(*operand, cell_value_bv) == 0;
|
||||
});
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -491,9 +496,11 @@ bool single_column_restriction::IN::is_satisfied_by(const schema& schema,
|
||||
return false;
|
||||
}
|
||||
auto operands = values(options);
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return std::any_of(operands.begin(), operands.end(), [&] (auto&& operand) {
|
||||
return operand && _column_def.type->compare(*operand, *cell_value) == 0;
|
||||
return operand && _column_def.type->compare(*operand, cell_value_bv) == 0;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
static query::range<bytes_view> to_range(const term_slice& slice, const query_options& options) {
|
||||
@@ -526,7 +533,9 @@ bool single_column_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
if (!cell_value) {
|
||||
return false;
|
||||
}
|
||||
return to_range(_slice, options).contains(*cell_value, _column_def.type->as_tri_comparator());
|
||||
return cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return to_range(_slice, options).contains(cell_value_bv, _column_def.type->as_tri_comparator());
|
||||
});
|
||||
}
|
||||
|
||||
bool single_column_restriction::contains::is_satisfied_by(const schema& schema,
|
||||
@@ -552,7 +561,8 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema,
|
||||
auto&& element_type = col_type->is_set() ? col_type->name_comparator() : col_type->value_comparator();
|
||||
if (_column_def.type->is_multi_cell()) {
|
||||
auto cell = cells.find_cell(_column_def.id);
|
||||
auto&& elements = col_type->deserialize_mutation_form(cell->as_collection_mutation()).cells;
|
||||
return cell->as_collection_mutation().data.with_linearized([&] (bytes_view collection_bv) {
|
||||
auto&& elements = col_type->deserialize_mutation_form(collection_bv).cells;
|
||||
auto end = std::remove_if(elements.begin(), elements.end(), [now] (auto&& element) {
|
||||
return element.second.is_dead(now);
|
||||
});
|
||||
@@ -562,7 +572,9 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema,
|
||||
continue;
|
||||
}
|
||||
auto found = std::find_if(elements.begin(), end, [&] (auto&& element) {
|
||||
return element_type->compare(element.second.value(), *val) == 0;
|
||||
return element.second.value().with_linearized([&] (bytes_view value_bv) {
|
||||
return element_type->compare(value_bv, *val) == 0;
|
||||
});
|
||||
});
|
||||
if (found == end) {
|
||||
return false;
|
||||
@@ -589,16 +601,26 @@ bool single_column_restriction::contains::is_satisfied_by(const schema& schema,
|
||||
auto found = std::find_if(elements.begin(), end, [&] (auto&& element) {
|
||||
return map_key_type->compare(element.first, *map_key) == 0;
|
||||
});
|
||||
if (found == end || element_type->compare(found->second.value(), *map_value) != 0) {
|
||||
if (found == end) {
|
||||
return false;
|
||||
}
|
||||
auto cmp = found->second.value().with_linearized([&] (bytes_view value_bv) {
|
||||
return element_type->compare(value_bv, *map_value);
|
||||
});
|
||||
if (cmp != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
} else {
|
||||
auto cell_value = get_value(schema, key, ckey, cells, now);
|
||||
if (!cell_value) {
|
||||
return false;
|
||||
}
|
||||
auto deserialized = _column_def.type->deserialize(*cell_value);
|
||||
auto deserialized = cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return _column_def.type->deserialize(cell_value_bv);
|
||||
});
|
||||
for (auto&& value : _values) {
|
||||
auto val = value->bind_and_get(options);
|
||||
if (!val) {
|
||||
@@ -669,7 +691,9 @@ bool token_restriction::EQ::is_satisfied_by(const schema& schema,
|
||||
for (auto&& operand : values(options)) {
|
||||
if (operand) {
|
||||
auto cell_value = do_get_value(schema, **cdef, key, ckey, cells, now);
|
||||
satisfied = cell_value && (*cdef)->type->compare(*operand, *cell_value) == 0;
|
||||
satisfied = cell_value && cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return (*cdef)->type->compare(*operand, cell_value_bv) == 0;
|
||||
});
|
||||
}
|
||||
if (!satisfied) {
|
||||
break;
|
||||
@@ -691,7 +715,9 @@ bool token_restriction::slice::is_satisfied_by(const schema& schema,
|
||||
if (!cell_value) {
|
||||
return false;
|
||||
}
|
||||
satisfied = range.contains(*cell_value, cdef->type->as_tri_comparator());
|
||||
satisfied = cell_value->with_linearized([&] (bytes_view cell_value_bv) {
|
||||
return range.contains(cell_value_bv, cdef->type->as_tri_comparator());
|
||||
});
|
||||
if (!satisfied) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -239,18 +239,18 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
|
||||
public:
|
||||
void accept_partition_tombstone(tombstone) override {}
|
||||
void accept_static_cell(column_id, atomic_cell_view v) override {
|
||||
size += v.value().size();
|
||||
size += v.value().size_bytes();
|
||||
}
|
||||
void accept_static_cell(column_id, collection_mutation_view v) override {
|
||||
size += v.data.size();
|
||||
size += v.data.size_bytes();
|
||||
}
|
||||
void accept_row_tombstone(const range_tombstone&) override {}
|
||||
void accept_row(position_in_partition_view, const row_tombstone&, const row_marker&, is_dummy, is_continuous) override {}
|
||||
void accept_row_cell(column_id, atomic_cell_view v) override {
|
||||
size += v.value().size();
|
||||
size += v.value().size_bytes();
|
||||
}
|
||||
void accept_row_cell(column_id id, collection_mutation_view v) override {
|
||||
size += v.data.size();
|
||||
size += v.data.size_bytes();
|
||||
}
|
||||
|
||||
size_t size = 0;
|
||||
|
||||
@@ -3681,7 +3681,7 @@ std::ostream&
|
||||
operator<<(std::ostream& os, const atomic_cell_view& acv) {
|
||||
if (acv.is_live()) {
|
||||
return fprint(os, "atomic_cell{%s;ts=%d;expiry=%d,ttl=%d}",
|
||||
to_hex(acv.value()),
|
||||
to_hex(acv.value().linearize()),
|
||||
acv.timestamp(),
|
||||
acv.is_live_and_has_ttl() ? acv.expiry().time_since_epoch().count() : -1,
|
||||
acv.is_live_and_has_ttl() ? acv.ttl().count() : 0);
|
||||
|
||||
@@ -318,7 +318,8 @@ row_marker view_updates::compute_row_marker(const clustering_row& base_row) cons
|
||||
}
|
||||
|
||||
deletable_row& view_updates::get_view_row(const partition_key& base_key, const clustering_row& update) {
|
||||
auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) {
|
||||
std::vector<bytes> linearized_values;
|
||||
auto get_value = boost::adaptors::transformed([&, this] (const column_definition& cdef) -> bytes_view {
|
||||
auto* base_col = _base->get_column_definition(cdef.name());
|
||||
assert(base_col);
|
||||
switch (base_col->kind) {
|
||||
@@ -328,10 +329,11 @@ deletable_row& view_updates::get_view_row(const partition_key& base_key, const c
|
||||
return update.key().get_component(*_base, base_col->position());
|
||||
default:
|
||||
auto& c = update.cells().cell_at(base_col->id);
|
||||
if (base_col->is_atomic()) {
|
||||
return c.as_atomic_cell(cdef).value();
|
||||
auto value_view = base_col->is_atomic() ? c.as_atomic_cell(cdef).value() : c.as_collection_mutation().data;
|
||||
if (value_view.is_fragmented()) {
|
||||
return linearized_values.emplace_back(value_view.linearize());
|
||||
}
|
||||
return c.as_collection_mutation().data;
|
||||
return value_view.first_fragment();
|
||||
}
|
||||
});
|
||||
auto& partition = partition_for(partition_key::from_range(_view->partition_key_columns() | get_value));
|
||||
|
||||
@@ -186,11 +186,13 @@ private:
|
||||
update(item.as_atomic_cell(col));
|
||||
} else {
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(col.type);
|
||||
auto mview = ctype->deserialize_mutation_form(item.as_collection_mutation());
|
||||
item.as_collection_mutation().data.with_linearized([&] (bytes_view bv) {
|
||||
auto mview = ctype->deserialize_mutation_form(bv);
|
||||
update(mview.tomb);
|
||||
for (auto& entry : mview.cells) {
|
||||
update(entry.second);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -595,7 +595,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell
|
||||
} else {
|
||||
return std::move(wr).skip_expiry();
|
||||
}
|
||||
}().write_value(c.value());
|
||||
}().write_fragmented_value(c.value());
|
||||
[&, wr = std::move(after_value)] () mutable {
|
||||
if (slice.options.contains<query::partition_slice::option::send_ttl>() && c.is_live_and_has_ttl()) {
|
||||
return std::move(wr).write_ttl(c.ttl());
|
||||
@@ -621,6 +621,7 @@ void write_cell(RowWriter& w, const query::partition_slice& slice, const data_ty
|
||||
template<typename RowWriter>
|
||||
void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::atomic_cell_view c) {
|
||||
assert(c.is_live());
|
||||
counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) {
|
||||
auto wr = w.add().write();
|
||||
[&, wr = std::move(wr)] () mutable {
|
||||
if (slice.options.contains<query::partition_slice::option::send_timestamp>()) {
|
||||
@@ -629,9 +630,10 @@ void write_counter_cell(RowWriter& w, const query::partition_slice& slice, ::ato
|
||||
return std::move(wr).skip_timestamp();
|
||||
}
|
||||
}().skip_expiry()
|
||||
.write_value(counter_cell_view::total_value_type()->decompose(counter_cell_view(c).total_value()))
|
||||
.write_value(counter_cell_view::total_value_type()->decompose(ccv.total_value()))
|
||||
.skip_ttl()
|
||||
.end_qr_cell();
|
||||
});
|
||||
}
|
||||
|
||||
// Used to return the timestamp of the latest update to the row
|
||||
@@ -1624,7 +1626,8 @@ bool row::compact_and_expire(
|
||||
} else {
|
||||
auto&& cell = c.as_collection_mutation();
|
||||
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
|
||||
auto m_view = ctype->deserialize_mutation_form(cell);
|
||||
cell.data.with_linearized([&] (bytes_view cell_bv) {
|
||||
auto m_view = ctype->deserialize_mutation_form(cell_bv);
|
||||
collection_type_impl::mutation m = m_view.materialize(*ctype);
|
||||
any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before);
|
||||
if (m.cells.empty() && m.tomb <= tomb.tomb()) {
|
||||
@@ -1632,6 +1635,7 @@ bool row::compact_and_expire(
|
||||
} else {
|
||||
c = ctype->serialize_mutation_form(m);
|
||||
}
|
||||
});
|
||||
}
|
||||
return erase;
|
||||
});
|
||||
|
||||
@@ -44,7 +44,7 @@ template<typename Writer>
|
||||
auto write_live_cell(Writer&& writer, atomic_cell_view c)
|
||||
{
|
||||
return std::move(writer).write_created_at(c.timestamp())
|
||||
.write_value(c.value())
|
||||
.write_fragmented_value(c.value())
|
||||
.end_live_cell();
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c)
|
||||
.write_delta(delta)
|
||||
.end_counter_cell_update();
|
||||
} else {
|
||||
counter_cell_view ccv(c);
|
||||
return counter_cell_view::with_linearized(c, [&] (counter_cell_view ccv) {
|
||||
auto shards = std::move(value).start_value_counter_cell_full()
|
||||
.start_shards();
|
||||
if (service::get_local_storage_service().cluster_supports_correct_counter_order()) {
|
||||
@@ -72,6 +72,7 @@ auto write_counter_cell(Writer&& writer, atomic_cell_view c)
|
||||
}
|
||||
}
|
||||
return std::move(shards).end_shards().end_counter_cell_full();
|
||||
});
|
||||
}
|
||||
}().end_counter_cell();
|
||||
}
|
||||
@@ -83,7 +84,7 @@ auto write_expiring_cell(Writer&& writer, atomic_cell_view c)
|
||||
.write_expiry(c.expiry())
|
||||
.start_c()
|
||||
.write_created_at(c.timestamp())
|
||||
.write_value(c.value())
|
||||
.write_fragmented_value(c.value())
|
||||
.end_c()
|
||||
.end_expiring_cell();
|
||||
}
|
||||
@@ -101,8 +102,9 @@ auto write_dead_cell(Writer&& writer, atomic_cell_view c)
|
||||
template<typename Writer>
|
||||
auto write_collection_cell(Writer&& collection_writer, collection_mutation_view cmv, const column_definition& def)
|
||||
{
|
||||
return cmv.data.with_linearized([&] (bytes_view cmv_bv) {
|
||||
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
|
||||
auto m_view = ctype->deserialize_mutation_form(cmv);
|
||||
auto m_view = ctype->deserialize_mutation_form(cmv_bv);
|
||||
auto cells_writer = std::move(collection_writer).write_tomb(m_view.tomb).start_elements();
|
||||
for (auto&& c : m_view.cells) {
|
||||
auto cell_writer = cells_writer.add().write_key(c.first);
|
||||
@@ -115,6 +117,7 @@ auto write_collection_cell(Writer&& collection_writer, collection_mutation_view
|
||||
}
|
||||
}
|
||||
return std::move(cells_writer).end_elements().end_collection_cell();
|
||||
});
|
||||
}
|
||||
|
||||
template<typename Writer>
|
||||
|
||||
@@ -108,7 +108,7 @@ collection_mutation read_collection_cell(const collection_type_impl& ctype, ser:
|
||||
for (auto&& e : elements) {
|
||||
mut.cells.emplace_back(e.key(), read_atomic_cell(*ctype.value_comparator(), e.value()));
|
||||
}
|
||||
return collection_type_impl::serialize_mutation_form(mut);
|
||||
return ctype.serialize_mutation_form(mut);
|
||||
}
|
||||
|
||||
template<typename Visitor>
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include <unordered_map>
|
||||
#include <type_traits>
|
||||
#include <deque>
|
||||
#include "atomic_cell.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -67,6 +68,11 @@ struct disk_string_view {
|
||||
bytes_view value;
|
||||
};
|
||||
|
||||
template<typename SizeType>
|
||||
struct disk_data_value_view {
|
||||
atomic_cell_value_view value;
|
||||
};
|
||||
|
||||
template <typename Size, typename Members>
|
||||
struct disk_array {
|
||||
static_assert(std::is_integral<Size>::value, "Length type must be convertible to integer");
|
||||
|
||||
@@ -378,6 +378,15 @@ inline void write(sstable_version_types v, file_writer& out, const disk_string_v
|
||||
write(v, out, len, s.value);
|
||||
}
|
||||
|
||||
template<typename SizeType>
|
||||
inline void write(sstable_version_types ver, file_writer& out, const disk_data_value_view<SizeType>& v) {
|
||||
SizeType length;
|
||||
check_truncate_and_assign(length, v.value.size_bytes());
|
||||
write(ver, out, length);
|
||||
using boost::range::for_each;
|
||||
for_each(v.value, [&] (bytes_view fragment) { write(ver, out, fragment); });
|
||||
}
|
||||
|
||||
// We cannot simply read the whole array at once, because we don't know its
|
||||
// full size. We know the number of elements, but if we are talking about
|
||||
// disk_strings, for instance, we have no idea how much of the stream each
|
||||
@@ -1712,6 +1721,16 @@ void write_cell_value(file_writer& out, const abstract_type& type, bytes_view va
|
||||
}
|
||||
}
|
||||
|
||||
void write_cell_value(file_writer& out, const abstract_type& type, atomic_cell_value_view value) {
|
||||
if (!value.empty()) {
|
||||
if (!type.value_length_if_fixed()) {
|
||||
write_vint(out, value.size_bytes());
|
||||
}
|
||||
using boost::range::for_each;
|
||||
for_each(value, [&] (bytes_view fragment) { write(sstable_version_types::mc, out, fragment); });
|
||||
}
|
||||
}
|
||||
|
||||
static inline void update_cell_stats(column_stats& c_stats, api::timestamp_type timestamp) {
|
||||
c_stats.update_timestamp(timestamp);
|
||||
c_stats.cells_count++;
|
||||
@@ -1770,19 +1789,20 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d
|
||||
column_mask mask = column_mask::counter;
|
||||
write(_version, out, mask, int64_t(0), timestamp);
|
||||
|
||||
counter_cell_view ccv(cell);
|
||||
counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) {
|
||||
write_counter_value(ccv, out, _version, [v = _version] (file_writer& out, uint32_t value) {
|
||||
return write(v, out, value);
|
||||
});
|
||||
|
||||
_c_stats.update_local_deletion_time(std::numeric_limits<int>::max());
|
||||
});
|
||||
} else if (cell.is_live_and_has_ttl()) {
|
||||
// expiring cell
|
||||
|
||||
column_mask mask = column_mask::expiration;
|
||||
uint32_t ttl = cell.ttl().count();
|
||||
uint32_t expiration = cell.expiry().time_since_epoch().count();
|
||||
disk_string_view<uint32_t> cell_value { cell.value() };
|
||||
disk_data_value_view<uint32_t> cell_value { cell.value() };
|
||||
|
||||
_c_stats.update_local_deletion_time(expiration);
|
||||
// tombstone histogram is updated with expiration time because if ttl is longer
|
||||
@@ -1795,7 +1815,7 @@ void sstable::write_cell(file_writer& out, atomic_cell_view cell, const column_d
|
||||
// regular cell
|
||||
|
||||
column_mask mask = column_mask::none;
|
||||
disk_string_view<uint32_t> cell_value { cell.value() };
|
||||
disk_data_value_view<uint32_t> cell_value { cell.value() };
|
||||
|
||||
_c_stats.update_local_deletion_time(std::numeric_limits<int>::max());
|
||||
|
||||
@@ -1884,8 +1904,9 @@ void sstable::write_range_tombstone(file_writer& out,
|
||||
}
|
||||
|
||||
void sstable::write_collection(file_writer& out, const composite& clustering_key, const column_definition& cdef, collection_mutation_view collection) {
|
||||
collection.data.with_linearized([&] (bytes_view collection_bv) {
|
||||
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
|
||||
auto mview = t->deserialize_mutation_form(collection);
|
||||
auto mview = t->deserialize_mutation_form(collection_bv);
|
||||
const bytes& column_name = cdef.name();
|
||||
if (mview.tomb) {
|
||||
write_range_tombstone(out, clustering_key, composite::eoc::start, clustering_key, composite::eoc::end, { column_name }, mview.tomb);
|
||||
@@ -1894,6 +1915,7 @@ void sstable::write_collection(file_writer& out, const composite& clustering_key
|
||||
index_and_write_column_name(out, clustering_key, { column_name, cp.first });
|
||||
write_cell(out, cp.second, cdef);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// This function is about writing a clustered_row to data file according to SSTables format.
|
||||
@@ -2904,10 +2926,11 @@ void sstable_writer_m::write_cell(file_writer& writer, atomic_cell_view cell, co
|
||||
if (has_value) {
|
||||
if (cdef.is_counter()) {
|
||||
assert(!cell.is_counter_update());
|
||||
counter_cell_view ccv(cell);
|
||||
counter_cell_view::with_linearized(cell, [&] (counter_cell_view ccv) {
|
||||
write_counter_value(ccv, writer, sstable_version_types::mc, [] (file_writer& out, uint32_t value) {
|
||||
return write_vint(out, value);
|
||||
});
|
||||
});
|
||||
} else {
|
||||
write_cell_value(writer, *cdef.type, cell.value());
|
||||
}
|
||||
@@ -2954,7 +2977,8 @@ void sstable_writer_m::write_liveness_info(file_writer& writer, const row_marker
|
||||
void sstable_writer_m::write_collection(file_writer& writer, const column_definition& cdef,
|
||||
collection_mutation_view collection, const row_time_properties& properties, bool has_complex_deletion) {
|
||||
auto& ctype = *static_pointer_cast<const collection_type_impl>(cdef.type);
|
||||
auto mview = ctype.deserialize_mutation_form(collection);
|
||||
collection.data.with_linearized([&] (bytes_view collection_bv) {
|
||||
auto mview = ctype.deserialize_mutation_form(collection_bv);
|
||||
if (has_complex_deletion) {
|
||||
auto dt = to_deletion_time(mview.tomb);
|
||||
write_delta_deletion_time(writer, dt);
|
||||
@@ -2972,6 +2996,7 @@ void sstable_writer_m::write_collection(file_writer& writer, const column_defini
|
||||
++_c_stats.cells_count;
|
||||
write_cell(writer, cell, cdef, properties, cell_path);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void sstable_writer_m::write_cells(file_writer& writer, column_kind kind, const row& row_body,
|
||||
@@ -3071,12 +3096,14 @@ static bool row_has_complex_deletion(const schema& s, const row& r) {
|
||||
return stop_iteration::no;
|
||||
}
|
||||
auto t = static_pointer_cast<const collection_type_impl>(cdef.type);
|
||||
auto mview = t->deserialize_mutation_form(c.as_collection_mutation());
|
||||
return c.as_collection_mutation().data.with_linearized([&] (bytes_view c_bv) {
|
||||
auto mview = t->deserialize_mutation_form(c_bv);
|
||||
if (mview.tomb) {
|
||||
result = true;
|
||||
}
|
||||
return stop_iteration(static_cast<bool>(mview.tomb));
|
||||
});
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -67,24 +67,28 @@ SEASTAR_TEST_CASE(test_counter_cell) {
|
||||
b1.add_shard(counter_shard(id[0], 5, 1));
|
||||
b1.add_shard(counter_shard(id[1], -4, 1));
|
||||
auto c1 = atomic_cell_or_collection(b1.build(0));
|
||||
|
||||
auto cv = counter_cell_view(c1.as_atomic_cell(cdef));
|
||||
|
||||
atomic_cell_or_collection c2;
|
||||
counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) {
|
||||
BOOST_REQUIRE_EQUAL(cv.total_value(), 1);
|
||||
verify_shard_order(cv);
|
||||
|
||||
counter_cell_builder b2;
|
||||
b2.add_shard(counter_shard(*cv.get_shard(id[0])).update(2, 1));
|
||||
b2.add_shard(counter_shard(id[2], 1, 1));
|
||||
auto c2 = atomic_cell_or_collection(b2.build(0));
|
||||
c2 = atomic_cell_or_collection(b2.build(0));
|
||||
});
|
||||
|
||||
cv = counter_cell_view(c2.as_atomic_cell(cdef));
|
||||
counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv) {
|
||||
BOOST_REQUIRE_EQUAL(cv.total_value(), 8);
|
||||
verify_shard_order(cv);
|
||||
});
|
||||
|
||||
counter_cell_view::apply(cdef, c1, c2);
|
||||
cv = counter_cell_view(c1.as_atomic_cell(cdef));
|
||||
counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv) {
|
||||
BOOST_REQUIRE_EQUAL(cv.total_value(), 4);
|
||||
verify_shard_order(cv);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -97,9 +101,10 @@ SEASTAR_TEST_CASE(test_apply) {
|
||||
auto src = b.copy(*cdef.type);
|
||||
counter_cell_view::apply(cdef, dst, src);
|
||||
|
||||
auto cv = counter_cell_view(dst.as_atomic_cell(cdef));
|
||||
counter_cell_view::with_linearized(dst.as_atomic_cell(cdef), [&] (counter_cell_view cv) {
|
||||
BOOST_REQUIRE_EQUAL(cv.total_value(), value);
|
||||
BOOST_REQUIRE_EQUAL(cv.timestamp(), std::max(dst.as_atomic_cell(cdef).timestamp(), src.as_atomic_cell(cdef).timestamp()));
|
||||
});
|
||||
};
|
||||
auto id = generate_ids(5);
|
||||
|
||||
@@ -235,15 +240,17 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
m.apply(m2);
|
||||
auto ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
counter_cell_view ccv { ac };
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), -102);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
ac = get_static_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 20);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
m.apply(m3);
|
||||
ac = get_counter_cell(m);
|
||||
@@ -263,28 +270,32 @@ SEASTAR_TEST_CASE(test_counter_mutations) {
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m2.partition()));
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 2);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
ac = get_static_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 11);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m2.partition().difference(s, m1.partition()));
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), -105);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
ac = get_static_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 9);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
m = mutation(s, m1.decorated_key(), m1.partition().difference(s, m3.partition()));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().clustered_rows().calculate_size(), 0);
|
||||
@@ -420,30 +431,34 @@ SEASTAR_TEST_CASE(test_transfer_updates_to_shards) {
|
||||
|
||||
auto ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
auto ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 5);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
ac = get_static_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 4);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
m = m2;
|
||||
transform_counter_updates_to_shards(m, &m0, 0);
|
||||
|
||||
ac = get_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 14);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
ac = get_static_counter_cell(m);
|
||||
BOOST_REQUIRE(ac.is_live());
|
||||
ccv = counter_cell_view(ac);
|
||||
counter_cell_view::with_linearized(ac, [&] (counter_cell_view ccv) {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 12);
|
||||
verify_shard_order(ccv);
|
||||
});
|
||||
|
||||
m = m3;
|
||||
transform_counter_updates_to_shards(m, &m0, 0);
|
||||
@@ -502,13 +517,14 @@ SEASTAR_TEST_CASE(test_sanitize_corrupted_cells) {
|
||||
auto c2 = atomic_cell_or_collection(b2.build(0));
|
||||
|
||||
// Compare
|
||||
auto cv1 = counter_cell_view(c1.as_atomic_cell(cdef));
|
||||
auto cv2 = counter_cell_view(c2.as_atomic_cell(cdef));
|
||||
|
||||
counter_cell_view::with_linearized(c1.as_atomic_cell(cdef), [&] (counter_cell_view cv1) {
|
||||
counter_cell_view::with_linearized(c2.as_atomic_cell(cdef), [&] (counter_cell_view cv2) {
|
||||
BOOST_REQUIRE_EQUAL(cv1, cv2);
|
||||
BOOST_REQUIRE_EQUAL(cv1.total_value(), cv2.total_value());
|
||||
verify_shard_order(cv1);
|
||||
verify_shard_order(cv2);
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -561,7 +577,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) {
|
||||
}
|
||||
auto ac = atomic_cell_or_collection(ccb.build(0));
|
||||
|
||||
auto cv = counter_cell_view(ac.as_atomic_cell(cdef));
|
||||
counter_cell_view::with_linearized(ac.as_atomic_cell(cdef), [&] (counter_cell_view cv) {
|
||||
|
||||
verify_shard_order(cv);
|
||||
|
||||
@@ -573,6 +589,7 @@ SEASTAR_TEST_CASE(test_shards_compatible_with_1_7_4) {
|
||||
}
|
||||
previous = cs.id();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -228,12 +228,11 @@ public:
|
||||
if (!col_def->type->is_multi_cell()) {
|
||||
auto c = cell->as_atomic_cell(*col_def);
|
||||
assert(c.is_live());
|
||||
actual = { c.value().begin(), c.value().end() };
|
||||
actual = c.value().linearize();
|
||||
} else {
|
||||
auto c = cell->as_collection_mutation();
|
||||
auto type = dynamic_pointer_cast<const collection_type_impl>(col_def->type);
|
||||
actual = type->to_value(type->deserialize_mutation_form(c),
|
||||
cql_serialization_format::internal());
|
||||
actual = type->to_value(c, cql_serialization_format::internal());
|
||||
}
|
||||
assert(col_def->type->equal(actual, exp));
|
||||
});
|
||||
|
||||
@@ -116,7 +116,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected static row with column %s, but it is not present", columns[i].name));
|
||||
}
|
||||
auto& cdef = _reader.schema()->static_column_at(columns[i].id);
|
||||
auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value());
|
||||
auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize());
|
||||
if (cmp != 0) {
|
||||
BOOST_FAIL(sprint("Expected static row with column %s having value %s, but it has value %s",
|
||||
columns[i].name,
|
||||
@@ -150,7 +150,7 @@ public:
|
||||
BOOST_FAIL(sprint("Expected row with column %s, but it is not present", columns[i].name));
|
||||
}
|
||||
auto& cdef = _reader.schema()->regular_column_at(columns[i].id);
|
||||
auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value());
|
||||
auto cmp = compare_unsigned(columns[i].value, cell->as_atomic_cell(cdef).value().linearize());
|
||||
if (cmp != 0) {
|
||||
BOOST_FAIL(sprint("Expected row with column %s having value %s, but it has value %s",
|
||||
columns[i].name,
|
||||
|
||||
@@ -75,6 +75,11 @@ static atomic_cell make_atomic_cell(data_type dt, T value) {
|
||||
return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value)));
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
static atomic_cell make_collection_member(data_type dt, T value) {
|
||||
return atomic_cell::make_live(*dt, 0, dt->decompose(std::move(value)));
|
||||
};
|
||||
|
||||
static mutation_partition get_partition(memtable& mt, const partition_key& key) {
|
||||
auto dk = dht::global_partitioner().decorate_key(*mt.schema(), key);
|
||||
auto reader = mt.make_flat_reader(mt.schema(), dht::partition_range::make_singular(dk));
|
||||
@@ -119,7 +124,7 @@ SEASTAR_TEST_CASE(test_mutation_is_applied) {
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_atomic_cell(r1_col);
|
||||
BOOST_REQUIRE(cell.is_live());
|
||||
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3)));
|
||||
BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(3)));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -207,19 +212,19 @@ SEASTAR_TEST_CASE(test_map_mutations) {
|
||||
auto mt = make_lw_shared<memtable>(s);
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto& column = *s->get_column_definition("s1");
|
||||
auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_atomic_cell(utf8_type, sstring("101")));
|
||||
auto mmut1 = make_collection_mutation({}, int32_type->decompose(101), make_collection_member(utf8_type, sstring("101")));
|
||||
mutation m1(s, key);
|
||||
m1.set_static_cell(column, my_map_type->serialize_mutation_form(mmut1));
|
||||
mt->apply(m1);
|
||||
auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102")));
|
||||
auto mmut2 = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102")));
|
||||
mutation m2(s, key);
|
||||
m2.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2));
|
||||
mt->apply(m2);
|
||||
auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_atomic_cell(utf8_type, sstring("103")));
|
||||
auto mmut3 = make_collection_mutation({}, int32_type->decompose(103), make_collection_member(utf8_type, sstring("103")));
|
||||
mutation m3(s, key);
|
||||
m3.set_static_cell(column, my_map_type->serialize_mutation_form(mmut3));
|
||||
mt->apply(m3);
|
||||
auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_atomic_cell(utf8_type, sstring("102 override")));
|
||||
auto mmut2o = make_collection_mutation({}, int32_type->decompose(102), make_collection_member(utf8_type, sstring("102 override")));
|
||||
mutation m2o(s, key);
|
||||
m2o.set_static_cell(column, my_map_type->serialize_mutation_form(mmut2o));
|
||||
mt->apply(m2o);
|
||||
@@ -229,7 +234,8 @@ SEASTAR_TEST_CASE(test_map_mutations) {
|
||||
auto i = r.find_cell(column.id);
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_collection_mutation();
|
||||
auto muts = my_map_type->deserialize_mutation_form(cell);
|
||||
auto cell_b = cell.data.linearize();
|
||||
auto muts = my_map_type->deserialize_mutation_form(cell_b);
|
||||
BOOST_REQUIRE(muts.cells.size() == 3);
|
||||
// FIXME: more strict tests
|
||||
});
|
||||
@@ -265,7 +271,8 @@ SEASTAR_TEST_CASE(test_set_mutations) {
|
||||
auto i = r.find_cell(column.id);
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_collection_mutation();
|
||||
auto muts = my_set_type->deserialize_mutation_form(cell);
|
||||
auto cell_b = cell.data.linearize();
|
||||
auto muts = my_set_type->deserialize_mutation_form(cell_b);
|
||||
BOOST_REQUIRE(muts.cells.size() == 3);
|
||||
// FIXME: more strict tests
|
||||
});
|
||||
@@ -280,19 +287,19 @@ SEASTAR_TEST_CASE(test_list_mutations) {
|
||||
auto key = partition_key::from_exploded(*s, {to_bytes("key1")});
|
||||
auto& column = *s->get_column_definition("s1");
|
||||
auto make_key = [] { return timeuuid_type->decompose(utils::UUID_gen::get_time_UUID()); };
|
||||
auto mmut1 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 101));
|
||||
auto mmut1 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 101));
|
||||
mutation m1(s, key);
|
||||
m1.set_static_cell(column, my_list_type->serialize_mutation_form(mmut1));
|
||||
mt->apply(m1);
|
||||
auto mmut2 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102));
|
||||
auto mmut2 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102));
|
||||
mutation m2(s, key);
|
||||
m2.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2));
|
||||
mt->apply(m2);
|
||||
auto mmut3 = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 103));
|
||||
auto mmut3 = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 103));
|
||||
mutation m3(s, key);
|
||||
m3.set_static_cell(column, my_list_type->serialize_mutation_form(mmut3));
|
||||
mt->apply(m3);
|
||||
auto mmut2o = make_collection_mutation({}, make_key(), make_atomic_cell(int32_type, 102));
|
||||
auto mmut2o = make_collection_mutation({}, make_key(), make_collection_member(int32_type, 102));
|
||||
mutation m2o(s, key);
|
||||
m2o.set_static_cell(column, my_list_type->serialize_mutation_form(mmut2o));
|
||||
mt->apply(m2o);
|
||||
@@ -302,7 +309,8 @@ SEASTAR_TEST_CASE(test_list_mutations) {
|
||||
auto i = r.find_cell(column.id);
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_collection_mutation();
|
||||
auto muts = my_list_type->deserialize_mutation_form(cell);
|
||||
auto cell_b = cell.data.linearize();
|
||||
auto muts = my_list_type->deserialize_mutation_form(cell_b);
|
||||
BOOST_REQUIRE(muts.cells.size() == 4);
|
||||
// FIXME: more strict tests
|
||||
});
|
||||
@@ -347,7 +355,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) {
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_atomic_cell(r1_col);
|
||||
BOOST_REQUIRE(cell.is_live());
|
||||
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1)));
|
||||
BOOST_REQUIRE(int32_type->equal(cell.value().linearize(), int32_type->decompose(r1)));
|
||||
}
|
||||
};
|
||||
verify_row(1001, 2001);
|
||||
@@ -485,7 +493,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) {
|
||||
auto c1 = value_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));
|
||||
auto cell = re.row().cells().find_cell(r1_col.id);
|
||||
if (cell) {
|
||||
result[p1][c1] = value_cast<int32_t>(int32_type->deserialize(cell->as_atomic_cell(r1_col).value()));
|
||||
result[p1][c1] = value_cast<int32_t>(int32_type->deserialize(cell->as_atomic_cell(r1_col).value().linearize()));
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@@ -899,7 +907,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
BOOST_REQUIRE(m2_1.find_row(*s, ckey2));
|
||||
BOOST_REQUIRE(m2_1.find_row(*s, ckey2)->find_cell(2));
|
||||
auto cmv = m2_1.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation();
|
||||
auto cm = my_set_type->deserialize_mutation_form(cmv);
|
||||
auto cmv_b = cmv.data.linearize();
|
||||
auto cm = my_set_type->deserialize_mutation_form(cmv_b);
|
||||
BOOST_REQUIRE(cm.cells.size() == 1);
|
||||
BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(3));
|
||||
|
||||
@@ -916,7 +925,8 @@ SEASTAR_TEST_CASE(test_mutation_diff) {
|
||||
BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(0));
|
||||
BOOST_REQUIRE(!m1_2.find_row(*s, ckey2)->find_cell(1));
|
||||
cmv = m1_2.find_row(*s, ckey2)->find_cell(2)->as_collection_mutation();
|
||||
cm = my_set_type->deserialize_mutation_form(cmv);
|
||||
cmv_b = cmv.data.linearize();
|
||||
cm = my_set_type->deserialize_mutation_form(cmv_b);
|
||||
BOOST_REQUIRE(cm.cells.size() == 1);
|
||||
BOOST_REQUIRE(cm.cells.front().first == int32_type->decompose(2));
|
||||
|
||||
@@ -962,7 +972,7 @@ SEASTAR_TEST_CASE(test_large_blobs) {
|
||||
BOOST_REQUIRE(i);
|
||||
auto cell = i->as_atomic_cell(s1_col);
|
||||
BOOST_REQUIRE(cell.is_live());
|
||||
BOOST_REQUIRE(bytes_type->equal(cell.value(), bytes_type->decompose(data_value(blob1))));
|
||||
BOOST_REQUIRE(bytes_type->equal(cell.value().linearize(), bytes_type->decompose(data_value(blob1))));
|
||||
|
||||
// Stress managed_bytes::linearize and scatter by merging a value into the cell
|
||||
mutation m2(s, key);
|
||||
@@ -975,7 +985,7 @@ SEASTAR_TEST_CASE(test_large_blobs) {
|
||||
BOOST_REQUIRE(i2);
|
||||
auto cell2 = i2->as_atomic_cell(s1_col);
|
||||
BOOST_REQUIRE(cell2.is_live());
|
||||
BOOST_REQUIRE(bytes_type->equal(cell2.value(), bytes_type->decompose(data_value(blob2))));
|
||||
BOOST_REQUIRE(bytes_type->equal(cell2.value().linearize(), bytes_type->decompose(data_value(blob2))));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -114,7 +114,7 @@ public:
|
||||
if (!ac.is_live()) {
|
||||
throw std::runtime_error("cell is dead");
|
||||
}
|
||||
return std::make_pair(value_cast<sstring>(utf8_type->deserialize(ac.value())), ac.timestamp());
|
||||
return std::make_pair(value_cast<sstring>(utf8_type->deserialize(ac.value().linearize())), ac.timestamp());
|
||||
}
|
||||
|
||||
mutation_fragment make_row(const clustering_key& key, sstring v) {
|
||||
|
||||
@@ -823,7 +823,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
|
||||
auto cell = r->find_cell(set_col.id);
|
||||
BOOST_REQUIRE(cell);
|
||||
auto t = static_pointer_cast<const collection_type_impl>(set_col.type);
|
||||
return t->deserialize_mutation_form(cell->as_collection_mutation());
|
||||
auto bv = cell->as_collection_mutation().data.linearize();
|
||||
return t->deserialize_mutation_form(bv).materialize(*t);
|
||||
};
|
||||
|
||||
auto sst = make_sstable(s, tmpdir_path, 11, la, big);
|
||||
@@ -832,7 +833,7 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
|
||||
return do_with(make_dkey(s, "key1"), [sstp, s, verifier, tomb, &static_set_col] (auto& key) {
|
||||
auto rd = make_lw_shared<flat_mutation_reader>(sstp->read_row_flat(s, key));
|
||||
return read_mutation_from_flat_mutation_reader(*rd).then([sstp, s, verifier, tomb, &static_set_col, rd] (auto mutation) {
|
||||
auto verify_set = [&tomb] (auto m) {
|
||||
auto verify_set = [&tomb] (const collection_type_impl::mutation& m) {
|
||||
BOOST_REQUIRE(bool(m.tomb) == true);
|
||||
BOOST_REQUIRE(m.tomb == tomb);
|
||||
BOOST_REQUIRE(m.cells.size() == 3);
|
||||
@@ -849,7 +850,8 @@ SEASTAR_TEST_CASE(datafile_generation_11) {
|
||||
|
||||
// The static set
|
||||
auto t = static_pointer_cast<const collection_type_impl>(static_set_col.type);
|
||||
auto mut = t->deserialize_mutation_form(scol->as_collection_mutation());
|
||||
auto bv = scol->as_collection_mutation().data.linearize();
|
||||
auto mut = t->deserialize_mutation_form(bv).materialize(*t);
|
||||
verify_set(mut);
|
||||
|
||||
// The clustered set
|
||||
@@ -2875,7 +2877,7 @@ SEASTAR_TEST_CASE(test_counter_read) {
|
||||
BOOST_REQUIRE(mfopt->is_clustering_row());
|
||||
const clustering_row* cr = &mfopt->as_clustering_row();
|
||||
cr->cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& c) {
|
||||
counter_cell_view ccv { c.as_atomic_cell(s->regular_column_at(id)) };
|
||||
counter_cell_view::with_linearized(c.as_atomic_cell(s->regular_column_at(id)), [&] (counter_cell_view ccv) {
|
||||
auto& col = s->column_at(column_kind::regular_column, id);
|
||||
if (col.name_as_text() == "c1") {
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), 13);
|
||||
@@ -2896,6 +2898,7 @@ SEASTAR_TEST_CASE(test_counter_read) {
|
||||
} else {
|
||||
BOOST_FAIL(sprint("Unexpected column \'%s\'", col.name_as_text()));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
mfopt = reader().get0();
|
||||
@@ -4373,11 +4376,12 @@ SEASTAR_TEST_CASE(test_wrong_counter_shard_order) {
|
||||
size_t n = 0;
|
||||
row.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& ac_o_c) {
|
||||
auto acv = ac_o_c.as_atomic_cell(s->regular_column_at(id));
|
||||
auto ccv = counter_cell_view(acv);
|
||||
counter_cell_view::with_linearized(acv, [&] (counter_cell_view ccv) {
|
||||
counter_shard_view::less_compare_by_id cmp;
|
||||
BOOST_REQUIRE_MESSAGE(boost::algorithm::is_sorted(ccv.shards(), cmp), ccv << " is expected to be sorted");
|
||||
BOOST_REQUIRE_EQUAL(ccv.total_value(), expected_value);
|
||||
n++;
|
||||
});
|
||||
});
|
||||
BOOST_REQUIRE_EQUAL(n, 5);
|
||||
};
|
||||
|
||||
@@ -559,7 +559,9 @@ inline void match(const row& row, const schema& s, bytes col, const data_value&
|
||||
}
|
||||
|
||||
auto expected = cdef->type->decompose(value);
|
||||
BOOST_REQUIRE(c.value() == expected);
|
||||
auto val = c.value().linearize();
|
||||
assert(val == expected);
|
||||
BOOST_REQUIRE(c.value().linearize() == expected);
|
||||
if (timestamp) {
|
||||
BOOST_REQUIRE(c.timestamp() == timestamp);
|
||||
}
|
||||
@@ -592,9 +594,11 @@ match_collection(const row& row, const schema& s, bytes col, const tombstone& t)
|
||||
BOOST_CHECK_NO_THROW(row.cell_at(cdef->id));
|
||||
auto c = row.cell_at(cdef->id).as_collection_mutation();
|
||||
auto ctype = static_pointer_cast<const collection_type_impl>(cdef->type);
|
||||
auto&& mut = ctype->deserialize_mutation_form(c);
|
||||
return c.data.with_linearized([&] (bytes_view c_bv) {
|
||||
auto&& mut = ctype->deserialize_mutation_form(c_bv);
|
||||
BOOST_REQUIRE(mut.tomb == t);
|
||||
return mut.materialize(*ctype);
|
||||
});
|
||||
}
|
||||
|
||||
template <status Status>
|
||||
@@ -612,7 +616,7 @@ inline void match_collection_element(const std::pair<bytes, atomic_cell>& elemen
|
||||
// the schema for the set type, and is enough for the purposes of this
|
||||
// test.
|
||||
if (expected_serialized_value) {
|
||||
BOOST_REQUIRE(element.second.value() == *expected_serialized_value);
|
||||
BOOST_REQUIRE(element.second.value().linearize() == *expected_serialized_value);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
66
types.cc
66
types.cc
@@ -2098,7 +2098,9 @@ collection_type_impl::as_cql3_type() const {
|
||||
|
||||
bytes
|
||||
collection_type_impl::to_value(collection_mutation_view mut, cql_serialization_format sf) const {
|
||||
return to_value(deserialize_mutation_form(mut), sf);
|
||||
return mut.data.with_linearized([&] (bytes_view bv) {
|
||||
return to_value(deserialize_mutation_form(bv), sf);
|
||||
});
|
||||
}
|
||||
|
||||
collection_type_impl::mutation
|
||||
@@ -2422,12 +2424,19 @@ map_type_impl::serialized_values(std::vector<atomic_cell> cells) const {
|
||||
|
||||
bytes
|
||||
map_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const {
|
||||
std::vector<bytes> linearized;
|
||||
std::vector<bytes_view> tmp;
|
||||
tmp.reserve(mut.cells.size() * 2);
|
||||
for (auto&& e : mut.cells) {
|
||||
if (e.second.is_live(mut.tomb, false)) {
|
||||
tmp.emplace_back(e.first);
|
||||
tmp.emplace_back(e.second.value());
|
||||
auto value_view = e.second.value();
|
||||
if (value_view.is_fragmented()) {
|
||||
auto& v = linearized.emplace_back(value_view.linearize());
|
||||
tmp.emplace_back(v);
|
||||
} else {
|
||||
tmp.emplace_back(value_view.first_fragment());
|
||||
}
|
||||
}
|
||||
}
|
||||
return pack(tmp.begin(), tmp.end(), tmp.size() / 2, sf);
|
||||
@@ -2477,8 +2486,7 @@ bool map_type_impl::references_duration() const {
|
||||
return _keys->references_duration() || _values->references_duration();
|
||||
}
|
||||
|
||||
auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm) const -> mutation_view {
|
||||
auto&& in = cm.data;
|
||||
auto collection_type_impl::deserialize_mutation_form(bytes_view in) const -> mutation_view {
|
||||
mutation_view ret;
|
||||
auto has_tomb = read_simple<bool>(in);
|
||||
if (has_tomb) {
|
||||
@@ -2502,13 +2510,14 @@ auto collection_type_impl::deserialize_mutation_form(collection_mutation_view cm
|
||||
}
|
||||
|
||||
bool collection_type_impl::is_empty(collection_mutation_view cm) const {
|
||||
auto&& in = cm.data;
|
||||
return cm.data.with_linearized([&] (bytes_view in) { // FIXME: we can guarantee that this is in the first fragment
|
||||
auto has_tomb = read_simple<bool>(in);
|
||||
return !has_tomb && read_simple<uint32_t>(in) == 0;
|
||||
});
|
||||
}
|
||||
|
||||
bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone tomb, gc_clock::time_point now) const {
|
||||
auto&& in = cm.data;
|
||||
return cm.data.with_linearized([&] (bytes_view in) {
|
||||
auto has_tomb = read_simple<bool>(in);
|
||||
if (has_tomb) {
|
||||
auto ts = read_simple<api::timestamp_type>(in);
|
||||
@@ -2526,10 +2535,11 @@ bool collection_type_impl::is_any_live(collection_mutation_view cm, tombstone to
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
api::timestamp_type collection_type_impl::last_update(collection_mutation_view cm) const {
|
||||
auto&& in = cm.data;
|
||||
return cm.data.with_linearized([&] (bytes_view in) {
|
||||
api::timestamp_type max = api::missing_timestamp;
|
||||
auto has_tomb = read_simple<bool>(in);
|
||||
if (has_tomb) {
|
||||
@@ -2545,11 +2555,13 @@ api::timestamp_type collection_type_impl::last_update(collection_mutation_view c
|
||||
max = std::max(value.timestamp(), max);
|
||||
}
|
||||
return max;
|
||||
});
|
||||
}
|
||||
|
||||
template <typename Iterator>
|
||||
collection_mutation
|
||||
do_serialize_mutation_form(
|
||||
const collection_type_impl& ctype,
|
||||
const tombstone& tomb,
|
||||
boost::iterator_range<Iterator> cells) {
|
||||
auto element_size = [] (size_t c, auto&& e) -> size_t {
|
||||
@@ -2577,9 +2589,10 @@ do_serialize_mutation_form(
|
||||
auto&& k = kv.first;
|
||||
auto&& v = kv.second;
|
||||
writeb(k);
|
||||
|
||||
writeb(v.serialize());
|
||||
}
|
||||
return collection_mutation{std::move(ret)};
|
||||
return collection_mutation(std::move(ret));
|
||||
}
|
||||
|
||||
bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb, gc_clock::time_point query_time,
|
||||
@@ -2619,26 +2632,28 @@ bool collection_type_impl::mutation::compact_and_expire(row_tombstone base_tomb,
|
||||
}
|
||||
|
||||
collection_mutation
|
||||
collection_type_impl::serialize_mutation_form(const mutation& mut) {
|
||||
return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
|
||||
collection_type_impl::serialize_mutation_form(const mutation& mut) const {
|
||||
return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
|
||||
}
|
||||
|
||||
collection_mutation
|
||||
collection_type_impl::serialize_mutation_form(mutation_view mut) {
|
||||
return do_serialize_mutation_form(mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
|
||||
collection_type_impl::serialize_mutation_form(mutation_view mut) const {
|
||||
return do_serialize_mutation_form(*this, mut.tomb, boost::make_iterator_range(mut.cells.begin(), mut.cells.end()));
|
||||
}
|
||||
|
||||
collection_mutation
|
||||
collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) {
|
||||
return do_serialize_mutation_form(mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) {
|
||||
collection_type_impl::serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const {
|
||||
return do_serialize_mutation_form(*this, mut.tomb, mut.cells | boost::adaptors::filtered([t = mut.tomb, now] (auto&& e) {
|
||||
return e.second.is_live(t, now, false);
|
||||
}));
|
||||
}
|
||||
|
||||
collection_mutation
|
||||
collection_type_impl::merge(collection_mutation_view a, collection_mutation_view b) const {
|
||||
auto aa = deserialize_mutation_form(a);
|
||||
auto bb = deserialize_mutation_form(b);
|
||||
return a.data.with_linearized([&] (bytes_view a_in) {
|
||||
return b.data.with_linearized([&] (bytes_view b_in) {
|
||||
auto aa = deserialize_mutation_form(a_in);
|
||||
auto bb = deserialize_mutation_form(b_in);
|
||||
mutation_view merged;
|
||||
merged.cells.reserve(aa.cells.size() + bb.cells.size());
|
||||
using element_type = std::pair<bytes_view, atomic_cell_view>;
|
||||
@@ -2672,13 +2687,17 @@ collection_type_impl::merge(collection_mutation_view a, collection_mutation_view
|
||||
merge);
|
||||
merged.tomb = std::max(aa.tomb, bb.tomb);
|
||||
return serialize_mutation_form(merged);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
collection_mutation
|
||||
collection_type_impl::difference(collection_mutation_view a, collection_mutation_view b) const
|
||||
{
|
||||
auto aa = deserialize_mutation_form(a);
|
||||
auto bb = deserialize_mutation_form(b);
|
||||
return a.data.with_linearized([&] (bytes_view a_in) {
|
||||
return b.data.with_linearized([&] (bytes_view b_in) {
|
||||
auto aa = deserialize_mutation_form(a_in);
|
||||
auto bb = deserialize_mutation_form(b_in);
|
||||
mutation_view diff;
|
||||
diff.cells.reserve(std::max(aa.cells.size(), bb.cells.size()));
|
||||
auto key_type = name_comparator();
|
||||
@@ -2698,6 +2717,8 @@ collection_type_impl::difference(collection_mutation_view a, collection_mutation
|
||||
diff.tomb = aa.tomb;
|
||||
}
|
||||
return serialize_mutation_form(diff);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bytes_opt
|
||||
@@ -3166,11 +3187,18 @@ list_type_impl::serialized_values(std::vector<atomic_cell> cells) const {
|
||||
|
||||
bytes
|
||||
list_type_impl::to_value(mutation_view mut, cql_serialization_format sf) const {
|
||||
std::vector<bytes> linearized;
|
||||
std::vector<bytes_view> tmp;
|
||||
tmp.reserve(mut.cells.size());
|
||||
for (auto&& e : mut.cells) {
|
||||
if (e.second.is_live(mut.tomb, false)) {
|
||||
tmp.emplace_back(e.second.value());
|
||||
auto value_view = e.second.value();
|
||||
if (value_view.is_fragmented()) {
|
||||
auto& v = linearized.emplace_back(value_view.linearize());
|
||||
tmp.emplace_back(v);
|
||||
} else {
|
||||
tmp.emplace_back(value_view.first_fragment());
|
||||
}
|
||||
}
|
||||
}
|
||||
return pack(tmp.begin(), tmp.end(), tmp.size(), sf);
|
||||
|
||||
13
types.hh
13
types.hh
@@ -818,26 +818,29 @@ public:
|
||||
virtual shared_ptr<cql3::cql3_type> as_cql3_type() const override;
|
||||
template <typename BytesViewIterator>
|
||||
static bytes pack(BytesViewIterator start, BytesViewIterator finish, int elements, cql_serialization_format sf);
|
||||
mutation_view deserialize_mutation_form(collection_mutation_view in) const;
|
||||
// requires linearized collection_mutation_view, lifetime
|
||||
mutation_view deserialize_mutation_form(bytes_view in) const;
|
||||
bool is_empty(collection_mutation_view in) const;
|
||||
bool is_any_live(collection_mutation_view in, tombstone tomb = tombstone(), gc_clock::time_point now = gc_clock::time_point::min()) const;
|
||||
api::timestamp_type last_update(collection_mutation_view in) const;
|
||||
virtual bytes to_value(mutation_view mut, cql_serialization_format sf) const = 0;
|
||||
bytes to_value(collection_mutation_view mut, cql_serialization_format sf) const;
|
||||
// FIXME: use iterators?
|
||||
static collection_mutation serialize_mutation_form(const mutation& mut);
|
||||
static collection_mutation serialize_mutation_form(mutation_view mut);
|
||||
static collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now);
|
||||
collection_mutation serialize_mutation_form(const mutation& mut) const;
|
||||
collection_mutation serialize_mutation_form(mutation_view mut) const;
|
||||
collection_mutation serialize_mutation_form_only_live(mutation_view mut, gc_clock::time_point now) const;
|
||||
collection_mutation merge(collection_mutation_view a, collection_mutation_view b) const;
|
||||
collection_mutation difference(collection_mutation_view a, collection_mutation_view b) const;
|
||||
// Calls Func(atomic_cell_view) for each cell in this collection.
|
||||
// noexcept if Func doesn't throw.
|
||||
template<typename Func>
|
||||
void for_each_cell(collection_mutation_view c, Func&& func) const {
|
||||
auto m_view = deserialize_mutation_form(std::move(c));
|
||||
c.data.with_linearized([&] (bytes_view c_bv) {
|
||||
auto m_view = deserialize_mutation_form(c_bv);
|
||||
for (auto&& c : m_view.cells) {
|
||||
func(std::move(c.second));
|
||||
}
|
||||
});
|
||||
}
|
||||
virtual void serialize(const void* value, bytes::iterator& out, cql_serialization_format sf) const = 0;
|
||||
virtual data_value deserialize(bytes_view v, cql_serialization_format sf) const = 0;
|
||||
|
||||
Reference in New Issue
Block a user