mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
This will allow expressing lack of information about certain ranges of
rows (including the static row), which will be used in cache to
determine if information in cache is complete or not.
Continuity is represented internally using flags on row entries. The
key range between two consecutive entries is continuous iff
rows_entry::continuous() is true for the later entry. The range
starting after the last entry is assumed to be continuous. The range
corresponding to the key of the entry is continuous iff
rows_entry::dummy() is false.
[tgrabiec:
- based on the following commits:
4a5bf75 - Piotr Jastrzebski : mutation_partition: introduce dummy rows_entry
773070e - Piotr Jastrzebski : mutation_partition: add continuity flag to rows_entry
- documented that partition tombstone is always complete
- require specifying the partition tombstone when creating an incomplete entry
- replaced rows_entry(dummy_tag, ...) constructor with more general
rows_entry(position_in_partition, ...)
- documented continuity semantics on mutation_partition
- fixed _static_row_cached being lost by mutation_partition copy constructors
- fixed conversion to streamed_mutation to ignore dummy entries
- fixed mutation_partition serializer to drop dummy entries
- documented semantics of continuity on mutation_partition level
- dropped assumptions that dummy entries can be only at the last position
- changed equality to ignore continuity completely, rather than
partially (it was not ignoring dummy entries, but ignoring
continuity flag)
- added printout of continuity information in mutation_partition
- fixed handling of empty entries in apply_reversibly() with regards
to continuity; we no longer can remove empty entries before
merging, since that may affect continuity of the right-hand
mutation. Added _erased flag.
- fixed mutation_partition::clustered_row() with dummy==true to not ignore the key
- fixed partition_builder to not ignore continuity
- renamed dummy_tag_t to dummy_tag. _t suffix is reserved.
- standardized all APIs on is_dummy and is_continuous bool_class:es
- replaced add_dummy_entry() with ensure_last_dummy() with safer semantics
- dropped unused remove_dummy_entry()
- simplified and inlined cache_entry::add_dummy_entry()
- fixed mutation_partition(incomplete_tag) constructor to mark all row ranges as discontinuous
]
233 lines
9.0 KiB
C++
233 lines
9.0 KiB
C++
/*
|
|
* Copyright (C) 2015 ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* This file is part of Scylla.
|
|
*
|
|
* Scylla is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU Affero General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* Scylla is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include <seastar/core/simple-stream.hh>
|
|
|
|
#include "mutation_partition_view.hh"
|
|
#include "schema.hh"
|
|
#include "atomic_cell.hh"
|
|
#include "utils/data_input.hh"
|
|
#include "mutation_partition_serializer.hh"
|
|
#include "mutation_partition.hh"
|
|
#include "counters.hh"
|
|
|
|
#include "utils/UUID.hh"
|
|
#include "serializer.hh"
|
|
#include "idl/uuid.dist.hh"
|
|
#include "idl/keys.dist.hh"
|
|
#include "idl/mutation.dist.hh"
|
|
#include "serializer_impl.hh"
|
|
#include "serialization_visitors.hh"
|
|
#include "idl/uuid.dist.impl.hh"
|
|
#include "idl/keys.dist.impl.hh"
|
|
#include "idl/mutation.dist.impl.hh"
|
|
|
|
using namespace db;
|
|
|
|
namespace {
|
|
|
|
using atomic_cell_variant = boost::variant<ser::live_cell_view,
|
|
ser::expiring_cell_view,
|
|
ser::dead_cell_view,
|
|
ser::counter_cell_view,
|
|
ser::unknown_variant_type>;
|
|
|
|
atomic_cell read_atomic_cell(atomic_cell_variant cv)
|
|
{
|
|
struct atomic_cell_visitor : boost::static_visitor<atomic_cell> {
|
|
atomic_cell operator()(ser::live_cell_view& lcv) const {
|
|
return atomic_cell::make_live(lcv.created_at(), lcv.value());
|
|
}
|
|
atomic_cell operator()(ser::expiring_cell_view& ecv) const {
|
|
return atomic_cell::make_live(ecv.c().created_at(), ecv.c().value(), ecv.expiry(), ecv.ttl());
|
|
}
|
|
atomic_cell operator()(ser::dead_cell_view& dcv) const {
|
|
return atomic_cell::make_dead(dcv.tomb().timestamp(), dcv.tomb().deletion_time());
|
|
}
|
|
atomic_cell operator()(ser::counter_cell_view& ccv) const {
|
|
class counter_cell_visitor : public boost::static_visitor<atomic_cell> {
|
|
api::timestamp_type _created_at;
|
|
public:
|
|
explicit counter_cell_visitor(api::timestamp_type ts)
|
|
: _created_at(ts) { }
|
|
|
|
atomic_cell operator()(ser::counter_cell_full_view& ccv) const {
|
|
// TODO: a lot of copying for something called view
|
|
counter_cell_builder ccb; // we know the final number of shards
|
|
for (auto csv : ccv.shards()) {
|
|
ccb.add_shard(counter_shard(csv));
|
|
}
|
|
return ccb.build(_created_at);
|
|
}
|
|
atomic_cell operator()(ser::counter_cell_update_view& ccv) const {
|
|
return atomic_cell::make_live_counter_update(_created_at, ccv.delta());
|
|
}
|
|
atomic_cell operator()(ser::unknown_variant_type&) const {
|
|
throw std::runtime_error("Trying to deserialize counter cell in unknown state");
|
|
}
|
|
};
|
|
auto v = ccv.value();
|
|
return boost::apply_visitor(counter_cell_visitor(ccv.created_at()), v);
|
|
}
|
|
atomic_cell operator()(ser::unknown_variant_type&) const {
|
|
throw std::runtime_error("Trying to deserialize cell in unknown state");
|
|
}
|
|
};
|
|
return boost::apply_visitor(atomic_cell_visitor(), cv);
|
|
}
|
|
|
|
collection_mutation read_collection_cell(ser::collection_cell_view cv)
|
|
{
|
|
collection_type_impl::mutation mut;
|
|
mut.tomb = cv.tomb();
|
|
auto&& elements = cv.elements();
|
|
mut.cells.reserve(elements.size());
|
|
for (auto&& e : elements) {
|
|
mut.cells.emplace_back(e.key(), read_atomic_cell(e.value()));
|
|
}
|
|
return collection_type_impl::serialize_mutation_form(mut);
|
|
}
|
|
|
|
template<typename Visitor>
|
|
void read_and_visit_row(ser::row_view rv, const column_mapping& cm, column_kind kind, Visitor&& visitor)
|
|
{
|
|
for (auto&& cv : rv.columns()) {
|
|
auto id = cv.id();
|
|
auto& col = cm.column_at(kind, id);
|
|
|
|
class atomic_cell_or_collection_visitor : public boost::static_visitor<> {
|
|
Visitor& _visitor;
|
|
column_id _id;
|
|
const column_mapping_entry& _col;
|
|
public:
|
|
explicit atomic_cell_or_collection_visitor(Visitor& v, column_id id, const column_mapping_entry& col)
|
|
: _visitor(v), _id(id), _col(col) { }
|
|
|
|
void operator()(atomic_cell_variant& acv) const {
|
|
if (!_col.type()->is_atomic()) {
|
|
throw std::runtime_error("A collection expected, got an atomic cell");
|
|
}
|
|
// FIXME: Pass view to cell to avoid copy
|
|
auto&& outer = current_allocator();
|
|
with_allocator(standard_allocator(), [&] {
|
|
auto cell = read_atomic_cell(acv);
|
|
with_allocator(outer, [&] {
|
|
_visitor.accept_atomic_cell(_id, cell);
|
|
});
|
|
});
|
|
}
|
|
void operator()(ser::collection_cell_view& ccv) const {
|
|
if (_col.type()->is_atomic()) {
|
|
throw std::runtime_error("An atomic cell expected, got a collection");
|
|
}
|
|
// FIXME: Pass view to cell to avoid copy
|
|
auto&& outer = current_allocator();
|
|
with_allocator(standard_allocator(), [&] {
|
|
auto cell = read_collection_cell(ccv);
|
|
with_allocator(outer, [&] {
|
|
_visitor.accept_collection(_id, cell);
|
|
});
|
|
});
|
|
}
|
|
void operator()(ser::unknown_variant_type&) const {
|
|
throw std::runtime_error("Trying to deserialize unknown cell type");
|
|
}
|
|
};
|
|
auto&& cell = cv.c();
|
|
boost::apply_visitor(atomic_cell_or_collection_visitor(visitor, id, col), cell);
|
|
}
|
|
}
|
|
|
|
row_marker read_row_marker(boost::variant<ser::live_marker_view, ser::expiring_marker_view, ser::dead_marker_view, ser::no_marker_view, ser::unknown_variant_type> rmv)
|
|
{
|
|
struct row_marker_visitor : boost::static_visitor<row_marker> {
|
|
row_marker operator()(ser::live_marker_view& lmv) const {
|
|
return row_marker(lmv.created_at());
|
|
}
|
|
row_marker operator()(ser::expiring_marker_view& emv) const {
|
|
return row_marker(emv.lm().created_at(), emv.ttl(), emv.expiry());
|
|
}
|
|
row_marker operator()(ser::dead_marker_view& dmv) const {
|
|
return row_marker(dmv.tomb());
|
|
}
|
|
row_marker operator()(ser::no_marker_view&) const {
|
|
return row_marker();
|
|
}
|
|
row_marker operator()(ser::unknown_variant_type&) const {
|
|
throw std::runtime_error("Trying to deserialize unknown row marker type");
|
|
}
|
|
};
|
|
return boost::apply_visitor(row_marker_visitor(), rmv);
|
|
}
|
|
|
|
}
|
|
|
|
void
|
|
mutation_partition_view::accept(const schema& s, mutation_partition_visitor& visitor) const {
|
|
accept(s.get_column_mapping(), visitor);
|
|
}
|
|
|
|
void
|
|
mutation_partition_view::accept(const column_mapping& cm, mutation_partition_visitor& visitor) const {
|
|
auto in = _in;
|
|
auto mpv = ser::deserialize(in, boost::type<ser::mutation_partition_view>());
|
|
|
|
visitor.accept_partition_tombstone(mpv.tomb());
|
|
|
|
struct static_row_cell_visitor {
|
|
mutation_partition_visitor& _visitor;
|
|
|
|
void accept_atomic_cell(column_id id, const atomic_cell& ac) const {
|
|
_visitor.accept_static_cell(id, ac);
|
|
}
|
|
void accept_collection(column_id id, const collection_mutation& cm) const {
|
|
_visitor.accept_static_cell(id, cm);
|
|
}
|
|
};
|
|
read_and_visit_row(mpv.static_row(), cm, column_kind::static_column, static_row_cell_visitor{visitor});
|
|
|
|
for (auto&& rt : mpv.range_tombstones()) {
|
|
visitor.accept_row_tombstone(rt);
|
|
}
|
|
|
|
for (auto&& cr : mpv.rows()) {
|
|
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
|
|
visitor.accept_row(position_in_partition_view::for_key(cr.key()), t, read_row_marker(cr.marker()));
|
|
|
|
struct cell_visitor {
|
|
mutation_partition_visitor& _visitor;
|
|
|
|
void accept_atomic_cell(column_id id, const atomic_cell& ac) const {
|
|
_visitor.accept_row_cell(id, ac);
|
|
}
|
|
void accept_collection(column_id id, const collection_mutation& cm) const {
|
|
_visitor.accept_row_cell(id, cm);
|
|
}
|
|
};
|
|
read_and_visit_row(cr.cells(), cm, column_kind::regular_column, cell_visitor{visitor});
|
|
}
|
|
}
|
|
|
|
mutation_partition_view mutation_partition_view::from_view(ser::mutation_partition_view v)
|
|
{
|
|
return { v.v };
|
|
}
|