mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-24 10:30:38 +00:00
Merge branch 'tgrabiec/frozen-mutation-prep-v2' of github.com:cloudius-systems/seastar-dev into db
Preparatory changes before introducing frozen_mutation, from Tomasz.
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
#include "timestamp.hh"
|
||||
#include "tombstone.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "net/byteorder.hh"
|
||||
#include <cstdint>
|
||||
#include <iostream>
|
||||
|
||||
@@ -191,9 +192,14 @@ class collection_mutation {
|
||||
public:
|
||||
struct view {
|
||||
bytes_view data;
|
||||
bytes_view serialize() const { return data; }
|
||||
static view from_bytes(bytes_view v) { return { v }; }
|
||||
};
|
||||
struct one {
|
||||
bytes data;
|
||||
one() {}
|
||||
one(bytes b) : data(std::move(b)) {}
|
||||
one(view v) : data(v.data.begin(), v.data.end()) {}
|
||||
operator view() const { return { data }; }
|
||||
};
|
||||
};
|
||||
@@ -224,6 +230,9 @@ public:
|
||||
collection_mutation::view as_collection_mutation() const {
|
||||
return collection_mutation::view{_data};
|
||||
}
|
||||
bytes_view serialize() const {
|
||||
return _data;
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream&, const atomic_cell_or_collection&);
|
||||
};
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ public:
|
||||
* (This is NOT a method to create a token from its string representation;
|
||||
* for that, use tokenFactory.fromString.)
|
||||
*/
|
||||
virtual token get_token(const schema& s, const partition_key& key) = 0;
|
||||
virtual token get_token(const schema& s, partition_key_view key) = 0;
|
||||
virtual token get_token(const sstables::key_view& key) = 0;
|
||||
|
||||
/**
|
||||
|
||||
@@ -42,7 +42,7 @@ murmur3_partitioner::get_token(const sstables::key_view& key) {
|
||||
}
|
||||
|
||||
token
|
||||
murmur3_partitioner::get_token(const schema& s, const partition_key& key) {
|
||||
murmur3_partitioner::get_token(const schema& s, partition_key_view key) {
|
||||
std::array<uint64_t, 2> hash;
|
||||
auto&& legacy = key.legacy_form(s);
|
||||
utils::murmur_hash::hash3_x64_128(legacy.begin(), legacy.size(), 0, hash);
|
||||
|
||||
@@ -12,7 +12,7 @@ namespace dht {
|
||||
|
||||
class murmur3_partitioner final : public i_partitioner {
|
||||
public:
|
||||
virtual token get_token(const schema& s, const partition_key& key);
|
||||
virtual token get_token(const schema& s, partition_key_view key);
|
||||
virtual token get_token(const sstables::key_view& key);
|
||||
virtual bool preserves_order() override { return false; }
|
||||
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens);
|
||||
|
||||
11
keys.cc
11
keys.cc
@@ -17,3 +17,14 @@ std::ostream& operator<<(std::ostream& out, const clustering_key& ck) {
|
||||
std::ostream& operator<<(std::ostream& out, const clustering_key_prefix& ckp) {
|
||||
return out << "ckp{" << to_hex(ckp) << "}";
|
||||
}
|
||||
|
||||
const legacy_compound_view<partition_key_view::c_type>
|
||||
partition_key_view::legacy_form(const schema& s) const {
|
||||
return { *get_compound_type(s), _bytes };
|
||||
}
|
||||
|
||||
int
|
||||
partition_key_view::legacy_tri_compare(const schema& s, const partition_key& o) const {
|
||||
auto cmp = legacy_compound_view<c_type>::tri_comparator(*get_compound_type(s));
|
||||
return cmp(this->representation(), o.representation());
|
||||
}
|
||||
|
||||
221
keys.hh
221
keys.hh
@@ -32,11 +32,81 @@
|
||||
// schema, from which information about structure is extracted.
|
||||
|
||||
class partition_key;
|
||||
class partition_key_view;
|
||||
class clustering_key;
|
||||
class clustering_key_view;
|
||||
class clustering_key_prefix;
|
||||
class clustering_key_prefix_view;
|
||||
|
||||
// Abstracts serialized compound, managed by compound_type.
|
||||
template <typename TopLevel>
|
||||
// Abstracts a view to serialized compound.
|
||||
template <typename TopLevelView>
|
||||
class compound_view_wrapper {
|
||||
protected:
|
||||
bytes_view _bytes;
|
||||
protected:
|
||||
compound_view_wrapper(bytes_view v)
|
||||
: _bytes(v)
|
||||
{ }
|
||||
|
||||
static inline const auto& get_compound_type(const schema& s) {
|
||||
return TopLevelView::get_compound_type(s);
|
||||
}
|
||||
public:
|
||||
std::vector<bytes> explode(const schema& s) const {
|
||||
return get_compound_type(s)->deserialize_value(_bytes);
|
||||
}
|
||||
|
||||
bytes_view representation() const {
|
||||
return _bytes;
|
||||
}
|
||||
|
||||
struct less_compare {
|
||||
typename TopLevelView::compound _t;
|
||||
less_compare(const schema& s) : _t(get_compound_type(s)) {}
|
||||
bool operator()(const TopLevelView& k1, const TopLevelView& k2) const {
|
||||
return _t->less(k1.representation(), k2.representation());
|
||||
}
|
||||
};
|
||||
|
||||
struct hashing {
|
||||
typename TopLevelView::compound _t;
|
||||
hashing(const schema& s) : _t(get_compound_type(s)) {}
|
||||
size_t operator()(const TopLevelView& o) const {
|
||||
return _t->hash(o.representation());
|
||||
}
|
||||
};
|
||||
|
||||
struct equality {
|
||||
typename TopLevelView::compound _t;
|
||||
equality(const schema& s) : _t(get_compound_type(s)) {}
|
||||
bool operator()(const TopLevelView& o1, const TopLevelView& o2) const {
|
||||
return _t->equal(o1.representation(), o2.representation());
|
||||
}
|
||||
};
|
||||
|
||||
bool equal(const schema& s, const TopLevelView& other) const {
|
||||
return get_compound_type(s)->equal(representation(), other.representation());
|
||||
}
|
||||
|
||||
// begin() and end() return iterators over components of this compound. The iterator yields a bytes_view to the component.
|
||||
// The iterators satisfy InputIterator concept.
|
||||
auto begin(const schema& s) const {
|
||||
return get_compound_type(s)->begin(representation());
|
||||
}
|
||||
|
||||
// See begin()
|
||||
auto end(const schema& s) const {
|
||||
return get_compound_type(s)->end(representation());
|
||||
}
|
||||
|
||||
bytes_view get_component(const schema& s, size_t idx) const {
|
||||
auto it = begin(s);
|
||||
std::advance(it, idx);
|
||||
return *it;
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename TopLevelView>
|
||||
class compound_wrapper {
|
||||
protected:
|
||||
bytes _bytes;
|
||||
@@ -74,6 +144,14 @@ public:
|
||||
return TopLevel::from_bytes(get_compound_type(s)->serialize_single(std::move(v)));
|
||||
}
|
||||
|
||||
TopLevelView view() const {
|
||||
return TopLevelView::from_bytes(_bytes);
|
||||
}
|
||||
|
||||
operator TopLevelView() const {
|
||||
return view();
|
||||
}
|
||||
|
||||
// FIXME: return views
|
||||
std::vector<bytes> explode(const schema& s) const {
|
||||
return get_compound_type(s)->deserialize_value(_bytes);
|
||||
@@ -83,7 +161,13 @@ public:
|
||||
typename TopLevel::compound _t;
|
||||
less_compare(const schema& s) : _t(get_compound_type(s)) {}
|
||||
bool operator()(const TopLevel& k1, const TopLevel& k2) const {
|
||||
return _t->less(k1, k2);
|
||||
return _t->less(k1.representation(), k2.representation());
|
||||
}
|
||||
bool operator()(const TopLevelView& k1, const TopLevel& k2) const {
|
||||
return _t->less(k1.representation(), k2.representation());
|
||||
}
|
||||
bool operator()(const TopLevel& k1, const TopLevelView& k2) const {
|
||||
return _t->less(k1.representation(), k2.representation());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -99,18 +183,32 @@ public:
|
||||
typename TopLevel::compound _t;
|
||||
equality(const schema& s) : _t(get_compound_type(s)) {}
|
||||
bool operator()(const TopLevel& o1, const TopLevel& o2) const {
|
||||
return _t->equal(o1, o2);
|
||||
return _t->equal(o1.representation(), o2.representation());
|
||||
}
|
||||
bool operator()(const TopLevelView& o1, const TopLevel& o2) const {
|
||||
return _t->equal(o1.representation(), o2.representation());
|
||||
}
|
||||
bool operator()(const TopLevel& o1, const TopLevelView& o2) const {
|
||||
return _t->equal(o1.representation(), o2.representation());
|
||||
}
|
||||
};
|
||||
|
||||
bool equal(const schema& s, const TopLevel& other) const {
|
||||
return get_compound_type(s)->equal(*this, other);
|
||||
return get_compound_type(s)->equal(representation(), other.representation());
|
||||
}
|
||||
|
||||
bool equal(const schema& s, const TopLevelView& other) const {
|
||||
return get_compound_type(s)->equal(representation(), other.representation());
|
||||
}
|
||||
|
||||
operator bytes_view() const {
|
||||
return _bytes;
|
||||
}
|
||||
|
||||
bytes_view representation() const {
|
||||
return _bytes;
|
||||
}
|
||||
|
||||
// begin() and end() return iterators over components of this compound. The iterator yields a bytes_view to the component.
|
||||
// The iterators satisfy InputIterator concept.
|
||||
auto begin(const schema& s) const {
|
||||
@@ -176,9 +274,9 @@ public:
|
||||
};
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename PrefixTopLevel>
|
||||
class prefixable_full_compound : public compound_wrapper<TopLevel> {
|
||||
using base = compound_wrapper<TopLevel>;
|
||||
template <typename TopLevel, typename TopLevelView, typename PrefixTopLevel>
|
||||
class prefixable_full_compound : public compound_wrapper<TopLevel, TopLevelView> {
|
||||
using base = compound_wrapper<TopLevel, TopLevelView>;
|
||||
protected:
|
||||
prefixable_full_compound(bytes&& b) : base(std::move(b)) {}
|
||||
public:
|
||||
@@ -247,14 +345,22 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
auto prefix_view(const schema& s, unsigned prefix_len) const {
|
||||
return prefix_view_type(s, *this, prefix_len);
|
||||
prefix_view_type prefix_view(const schema& s, unsigned prefix_len) const {
|
||||
return { s, this->representation(), prefix_len };
|
||||
}
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename FullTopLevel>
|
||||
class prefix_compound_wrapper : public compound_wrapper<TopLevel> {
|
||||
using base = compound_wrapper<TopLevel>;
|
||||
class prefix_compound_view_wrapper : public compound_view_wrapper<TopLevel> {
|
||||
protected:
|
||||
prefix_compound_view_wrapper(bytes_view v)
|
||||
: compound_view_wrapper<TopLevel>(v)
|
||||
{ }
|
||||
};
|
||||
|
||||
template <typename TopLevel, typename TopLevelView, typename FullTopLevel>
|
||||
class prefix_compound_wrapper : public compound_wrapper<TopLevel, TopLevelView> {
|
||||
using base = compound_wrapper<TopLevel, TopLevelView>;
|
||||
protected:
|
||||
prefix_compound_wrapper(bytes&& b) : base(std::move(b)) {}
|
||||
public:
|
||||
@@ -276,11 +382,51 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class partition_key : public compound_wrapper<partition_key> {
|
||||
class partition_key_view : public compound_view_wrapper<partition_key_view> {
|
||||
public:
|
||||
using c_type = compound_type<allow_prefixes::no>;
|
||||
private:
|
||||
partition_key_view(bytes_view v)
|
||||
: compound_view_wrapper<partition_key_view>(v)
|
||||
{ }
|
||||
public:
|
||||
partition_key(bytes&& b) : compound_wrapper<partition_key>(std::move(b)) {}
|
||||
using compound = lw_shared_ptr<c_type>;
|
||||
|
||||
static partition_key_view from_bytes(bytes_view v) {
|
||||
return { v };
|
||||
}
|
||||
|
||||
static const compound& get_compound_type(const schema& s) {
|
||||
return s.partition_key_type();
|
||||
}
|
||||
|
||||
// Returns key's representation which is compatible with Origin.
|
||||
// The result is valid as long as the schema is live.
|
||||
const legacy_compound_view<c_type> legacy_form(const schema& s) const;
|
||||
|
||||
// A trichotomic comparator for ordering compatible with Origin.
|
||||
int legacy_tri_compare(const schema& s, const partition_key& o) const;
|
||||
|
||||
// Checks if keys are equal in a way which is compatible with Origin.
|
||||
bool legacy_equal(const schema& s, const partition_key& o) const {
|
||||
return legacy_tri_compare(s, o) == 0;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const partition_key_view& pk);
|
||||
};
|
||||
|
||||
class partition_key : public compound_wrapper<partition_key, partition_key_view> {
|
||||
public:
|
||||
using c_type = compound_type<allow_prefixes::no>;
|
||||
private:
|
||||
partition_key(bytes&& b)
|
||||
: compound_wrapper<partition_key, partition_key_view>(std::move(b))
|
||||
{ }
|
||||
public:
|
||||
partition_key(const partition_key_view& key)
|
||||
: partition_key(bytes(key.representation().begin(), key.representation().end()))
|
||||
{ }
|
||||
|
||||
using compound = lw_shared_ptr<c_type>;
|
||||
|
||||
static partition_key from_bytes(bytes b) {
|
||||
@@ -294,18 +440,17 @@ public:
|
||||
// Returns key's representation which is compatible with Origin.
|
||||
// The result is valid as long as the schema is live.
|
||||
const legacy_compound_view<c_type> legacy_form(const schema& s) const {
|
||||
return { *get_compound_type(s), _bytes };
|
||||
return view().legacy_form(s);
|
||||
}
|
||||
|
||||
// A trichotomic comparator for ordering compatible with Origin.
|
||||
int legacy_tri_compare(const schema& s, const partition_key& o) const {
|
||||
auto cmp = legacy_compound_view<c_type>::tri_comparator(*get_compound_type(s));
|
||||
return cmp(*this, o);
|
||||
return view().legacy_tri_compare(s, o);
|
||||
}
|
||||
|
||||
// Checks if keys are equal in a way which is compatible with Origin.
|
||||
bool legacy_equal(const schema& s, const partition_key& o) const {
|
||||
return legacy_tri_compare(s, o) == 0;
|
||||
return view().legacy_equal(s, o);
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const partition_key& pk);
|
||||
@@ -331,10 +476,26 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& os, const exploded_clustering_prefix& ecp);
|
||||
};
|
||||
|
||||
class clustering_key : public prefixable_full_compound<clustering_key, clustering_key_prefix> {
|
||||
class clustering_key_view : public compound_view_wrapper<clustering_key_view> {
|
||||
public:
|
||||
clustering_key(bytes&& b) : prefixable_full_compound<clustering_key, clustering_key_prefix>(std::move(b)) {}
|
||||
clustering_key_view(bytes_view v)
|
||||
: compound_view_wrapper<clustering_key_view>(v)
|
||||
{ }
|
||||
public:
|
||||
static clustering_key_view from_bytes(bytes_view v) {
|
||||
return { v };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key : public prefixable_full_compound<clustering_key, clustering_key_view, clustering_key_prefix> {
|
||||
clustering_key(bytes&& b)
|
||||
: prefixable_full_compound<clustering_key, clustering_key_view, clustering_key_prefix>(std::move(b))
|
||||
{ }
|
||||
public:
|
||||
clustering_key(const clustering_key_view& v)
|
||||
: clustering_key(bytes(v.representation().begin(), v.representation().end()))
|
||||
{ }
|
||||
|
||||
using compound = lw_shared_ptr<compound_type<allow_prefixes::no>>;
|
||||
|
||||
static clustering_key from_bytes(bytes b) {
|
||||
@@ -353,9 +514,25 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& out, const clustering_key& ck);
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key> {
|
||||
clustering_key_prefix(bytes&& b) : prefix_compound_wrapper<clustering_key_prefix, clustering_key>(std::move(b)) {}
|
||||
class clustering_key_prefix_view : public prefix_compound_view_wrapper<clustering_key_prefix_view, clustering_key> {
|
||||
clustering_key_prefix_view(bytes_view v)
|
||||
: prefix_compound_view_wrapper<clustering_key_prefix_view, clustering_key>(v)
|
||||
{ }
|
||||
public:
|
||||
static clustering_key_prefix_view from_bytes(bytes_view v) {
|
||||
return { v };
|
||||
}
|
||||
};
|
||||
|
||||
class clustering_key_prefix : public prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key> {
|
||||
clustering_key_prefix(bytes&& b)
|
||||
: prefix_compound_wrapper<clustering_key_prefix, clustering_key_prefix_view, clustering_key>(std::move(b))
|
||||
{ }
|
||||
public:
|
||||
clustering_key_prefix(clustering_key_prefix_view v)
|
||||
: clustering_key_prefix(bytes(v.representation().begin(), v.representation().end()))
|
||||
{ }
|
||||
|
||||
using compound = lw_shared_ptr<compound_type<allow_prefixes::yes>>;
|
||||
|
||||
static clustering_key_prefix from_bytes(bytes b) {
|
||||
|
||||
@@ -88,3 +88,11 @@ void mutation::update_column(row& row, const column_definition& def, atomic_cell
|
||||
merge_column(def, i->second, value);
|
||||
}
|
||||
}
|
||||
|
||||
bool mutation::operator==(const mutation& m) const {
|
||||
return _dk.equal(*_schema, m._dk) && _p.equal(*_schema, m._p);
|
||||
}
|
||||
|
||||
bool mutation::operator!=(const mutation& m) const {
|
||||
return !(*this == m);
|
||||
}
|
||||
|
||||
@@ -35,6 +35,8 @@ public:
|
||||
const mutation_partition& partition() const { return _p; }
|
||||
mutation_partition& partition() { return _p; }
|
||||
const utils::UUID& column_family_id() const { return _schema->id(); }
|
||||
bool operator==(const mutation&) const;
|
||||
bool operator!=(const mutation&) const;
|
||||
private:
|
||||
static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value);
|
||||
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
|
||||
|
||||
@@ -31,7 +31,7 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
|
||||
_tombstone.apply(p._tombstone);
|
||||
|
||||
for (auto&& e : p._row_tombstones) {
|
||||
apply_row_tombstone(schema, e.prefix(), e.t());
|
||||
apply_row_tombstone(*schema, e.prefix(), e.t());
|
||||
}
|
||||
|
||||
auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) {
|
||||
@@ -109,10 +109,10 @@ mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e)
|
||||
}
|
||||
|
||||
void
|
||||
mutation_partition::apply_row_tombstone(schema_ptr schema, clustering_key_prefix prefix, tombstone t) {
|
||||
assert(!prefix.is_full(*schema));
|
||||
auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(*schema));
|
||||
if (i == _row_tombstones.end() || !prefix.equal(*schema, i->prefix())) {
|
||||
mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t) {
|
||||
assert(!prefix.is_full(schema));
|
||||
auto i = _row_tombstones.lower_bound(prefix, row_tombstones_entry::compare(schema));
|
||||
if (i == _row_tombstones.end() || !prefix.equal(schema, i->prefix())) {
|
||||
auto e = new row_tombstones_entry(std::move(prefix), t);
|
||||
_row_tombstones.insert(i, *e);
|
||||
} else {
|
||||
@@ -127,7 +127,7 @@ mutation_partition::apply_delete(schema_ptr schema, const exploded_clustering_pr
|
||||
} else if (prefix.is_full(*schema)) {
|
||||
apply_delete(schema, clustering_key::from_clustering_prefix(*schema, prefix), t);
|
||||
} else {
|
||||
apply_row_tombstone(schema, clustering_key_prefix::from_clustering_prefix(*schema, prefix), t);
|
||||
apply_row_tombstone(*schema, clustering_key_prefix::from_clustering_prefix(*schema, prefix), t);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,6 +172,16 @@ mutation_partition::clustered_row(const clustering_key& key) {
|
||||
return i->row();
|
||||
}
|
||||
|
||||
deletable_row&
|
||||
mutation_partition::clustered_row(const schema& s, const clustering_key_view& key) {
|
||||
auto i = _rows.find(key, rows_entry::compare(s));
|
||||
if (i == _rows.end()) {
|
||||
auto e = new rows_entry(key);
|
||||
_rows.insert(i, *e);
|
||||
return e->row();
|
||||
}
|
||||
return i->row();
|
||||
}
|
||||
|
||||
boost::iterator_range<mutation_partition::rows_type::const_iterator>
|
||||
mutation_partition::range(const schema& schema, const query::range<clustering_key_prefix>& r) const {
|
||||
@@ -336,3 +346,50 @@ operator<<(std::ostream& os, const mutation_partition& mp) {
|
||||
mp._tombstone, ::join(", ", mp._row_tombstones), mp._static_row,
|
||||
::join(", ", mp._rows));
|
||||
}
|
||||
|
||||
static bool
|
||||
rows_equal(const schema& s, const row& r1, const row& r2) {
|
||||
return std::equal(r1.begin(), r1.end(), r2.begin(), r2.end(),
|
||||
[] (const row::value_type& c1, const row::value_type& c2) {
|
||||
return c1.first == c2.first && c1.second.serialize() == c2.second.serialize();
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
deletable_row::equal(const schema& s, const deletable_row& other) const {
|
||||
if (t != other.t || created_at != other.created_at) {
|
||||
return false;
|
||||
}
|
||||
return rows_equal(s, cells, other.cells);
|
||||
}
|
||||
|
||||
bool
|
||||
rows_entry::equal(const schema& s, const rows_entry& other) const {
|
||||
return key().equal(s, other.key()) && row().equal(s, other.row());
|
||||
}
|
||||
|
||||
bool
|
||||
row_tombstones_entry::equal(const schema& s, const row_tombstones_entry& other) const {
|
||||
return prefix().equal(s, other.prefix()) && t() == other.t();
|
||||
}
|
||||
|
||||
bool mutation_partition::equal(const schema& s, const mutation_partition& p) const {
|
||||
if (_tombstone != p._tombstone) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!std::equal(_rows.begin(), _rows.end(), p._rows.begin(), p._rows.end(),
|
||||
[&s] (const rows_entry& e1, const rows_entry& e2) { return e1.equal(s, e2); }
|
||||
)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!std::equal(_row_tombstones.begin(), _row_tombstones.end(),
|
||||
p._row_tombstones.begin(), p._row_tombstones.end(),
|
||||
[&s] (const row_tombstones_entry& e1, const row_tombstones_entry& e2) { return e1.equal(s, e2); }
|
||||
)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return rows_equal(s, _static_row, p._static_row);
|
||||
}
|
||||
|
||||
@@ -14,11 +14,13 @@
|
||||
#include "atomic_cell.hh"
|
||||
#include "query-result-writer.hh"
|
||||
|
||||
// FIXME: Encapsulate
|
||||
using row = std::map<column_id, atomic_cell_or_collection>;
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const row::value_type& rv);
|
||||
std::ostream& operator<<(std::ostream& os, const row& r);
|
||||
|
||||
// FIXME: Encapsulate
|
||||
struct deletable_row final {
|
||||
tombstone t;
|
||||
api::timestamp_type created_at = api::missing_timestamp;
|
||||
@@ -29,6 +31,7 @@ struct deletable_row final {
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const deletable_row& dr);
|
||||
bool equal(const schema& s, const deletable_row& other) const;
|
||||
};
|
||||
|
||||
class row_tombstones_entry : public boost::intrusive::set_base_hook<> {
|
||||
@@ -86,6 +89,7 @@ public:
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& os, const row_tombstones_entry& rte);
|
||||
bool equal(const schema& s, const row_tombstones_entry& other) const;
|
||||
};
|
||||
|
||||
class rows_entry : public boost::intrusive::set_base_hook<> {
|
||||
@@ -129,6 +133,12 @@ public:
|
||||
bool operator()(const rows_entry& e, const clustering_key& key) const {
|
||||
return _c(e._key, key);
|
||||
}
|
||||
bool operator()(const clustering_key_view& key, const rows_entry& e) const {
|
||||
return _c(key, e._key);
|
||||
}
|
||||
bool operator()(const rows_entry& e, const clustering_key_view& key) const {
|
||||
return _c(e._key, key);
|
||||
}
|
||||
};
|
||||
template <typename Comparator>
|
||||
struct delegating_compare {
|
||||
@@ -148,6 +158,7 @@ public:
|
||||
return delegating_compare<Comparator>(std::move(c));
|
||||
}
|
||||
friend std::ostream& operator<<(std::ostream& os, const rows_entry& re);
|
||||
bool equal(const schema& s, const rows_entry& other) const;
|
||||
};
|
||||
|
||||
namespace db {
|
||||
@@ -159,6 +170,7 @@ class serializer;
|
||||
class mutation_partition final {
|
||||
// FIXME: using boost::intrusive because gcc's std::set<> does not support heterogeneous lookup yet
|
||||
using rows_type = boost::intrusive::set<rows_entry, boost::intrusive::compare<rows_entry::compare>>;
|
||||
using row_tombstones_type = boost::intrusive::set<row_tombstones_entry, boost::intrusive::compare<row_tombstones_entry::compare>>;
|
||||
private:
|
||||
tombstone _tombstone;
|
||||
row _static_row;
|
||||
@@ -166,7 +178,7 @@ private:
|
||||
// Contains only strict prefixes so that we don't have to lookup full keys
|
||||
// in both _row_tombstones and _rows.
|
||||
// FIXME: using boost::intrusive because gcc's std::set<> does not support heterogeneous lookup yet
|
||||
boost::intrusive::set<row_tombstones_entry, boost::intrusive::compare<row_tombstones_entry::compare>> _row_tombstones;
|
||||
row_tombstones_type _row_tombstones;
|
||||
|
||||
template<typename T>
|
||||
friend class db::serializer;
|
||||
@@ -185,13 +197,15 @@ public:
|
||||
void apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t);
|
||||
void apply_delete(schema_ptr schema, clustering_key&& key, tombstone t);
|
||||
// prefix must not be full
|
||||
void apply_row_tombstone(schema_ptr schema, clustering_key_prefix prefix, tombstone t);
|
||||
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
|
||||
void apply(schema_ptr schema, const mutation_partition& p);
|
||||
row& static_row() { return _static_row; }
|
||||
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
|
||||
const rows_type& clustered_rows() const { return _rows; }
|
||||
const row& static_row() const { return _static_row; }
|
||||
const row_tombstones_type& row_tombstones() const { return _row_tombstones; }
|
||||
deletable_row& clustered_row(const clustering_key& key);
|
||||
deletable_row& clustered_row(const schema& s, const clustering_key_view& key);
|
||||
const row* find_row(const clustering_key& key) const;
|
||||
const rows_entry* find_entry(schema_ptr schema, const clustering_key_prefix& key) const;
|
||||
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
|
||||
@@ -200,4 +214,6 @@ public:
|
||||
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
|
||||
boost::iterator_range<rows_type::const_iterator> range(const schema& schema, const query::range<clustering_key_prefix>& r) const;
|
||||
void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const;
|
||||
public:
|
||||
bool equal(const schema& s, const mutation_partition&) const;
|
||||
};
|
||||
|
||||
52
schema_builder.hh
Normal file
52
schema_builder.hh
Normal file
@@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "schema.hh"
|
||||
|
||||
enum class column_kind { partition_key, clustering_key, static_column, regular_column };
|
||||
|
||||
struct schema_builder {
|
||||
sstring _ks_name;
|
||||
sstring _cf_name;
|
||||
std::vector<schema::column> _partition_key;
|
||||
std::vector<schema::column> _clustering_key;
|
||||
std::vector<schema::column> _static_columns;
|
||||
std::vector<schema::column> _regular_columns;
|
||||
public:
|
||||
schema_builder(const sstring& ks_name, const sstring& cf_name)
|
||||
: _ks_name(ks_name)
|
||||
, _cf_name(cf_name)
|
||||
{ }
|
||||
|
||||
schema_builder& with_column(bytes name, data_type type, column_kind kind = column_kind::regular_column) {
|
||||
switch (kind) {
|
||||
case column_kind::partition_key:
|
||||
_partition_key.emplace_back(schema::column{name, type});
|
||||
break;
|
||||
case column_kind::clustering_key:
|
||||
_clustering_key.emplace_back(schema::column{name, type});
|
||||
break;
|
||||
case column_kind::static_column:
|
||||
_static_columns.emplace_back(schema::column{name, type});
|
||||
break;
|
||||
case column_kind::regular_column:
|
||||
_regular_columns.emplace_back(schema::column{name, type});
|
||||
break;
|
||||
};
|
||||
return *this;
|
||||
}
|
||||
|
||||
schema_ptr build() {
|
||||
return make_lw_shared<schema>(schema({},
|
||||
_ks_name,
|
||||
_cf_name,
|
||||
_partition_key,
|
||||
_clustering_key,
|
||||
_regular_columns,
|
||||
_static_columns,
|
||||
utf8_type));
|
||||
}
|
||||
};
|
||||
@@ -96,3 +96,20 @@ BOOST_AUTO_TEST_CASE(test_legacy_ordering_for_composite_keys) {
|
||||
BOOST_REQUIRE(cmp(to_key("", "A"), to_key("A", "A")) < 0);
|
||||
BOOST_REQUIRE(cmp(to_key("A", ""), to_key("A", "A")) < 0);
|
||||
}
|
||||
|
||||
BOOST_AUTO_TEST_CASE(test_conversions_between_view_and_wrapper) {
|
||||
schema s({}, "", "", {{"c1", bytes_type}}, {}, {}, {}, utf8_type);
|
||||
|
||||
auto key = partition_key::from_deeply_exploded(s, {bytes("value")});
|
||||
partition_key_view key_view = key;
|
||||
|
||||
BOOST_REQUIRE(key_view.equal(s, key));
|
||||
BOOST_REQUIRE(key.equal(s, key_view));
|
||||
|
||||
partition_key key2 = key_view;
|
||||
|
||||
BOOST_REQUIRE(key2.equal(s, key));
|
||||
BOOST_REQUIRE(key.equal(s, key2));
|
||||
|
||||
BOOST_REQUIRE(*key.begin(s) == bytes("value"));
|
||||
}
|
||||
|
||||
@@ -57,18 +57,18 @@ BOOST_AUTO_TEST_CASE(test_multi_level_row_tombstones) {
|
||||
return clustering_key::from_deeply_exploded(*s, v);
|
||||
};
|
||||
|
||||
m.partition().apply_row_tombstone(s, make_prefix({1, 2}), tombstone(9, ttl));
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 2}), tombstone(9, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 3})), tombstone(9, ttl));
|
||||
|
||||
m.partition().apply_row_tombstone(s, make_prefix({1, 3}), tombstone(8, ttl));
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 3}), tombstone(8, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(9, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(8, ttl));
|
||||
|
||||
m.partition().apply_row_tombstone(s, make_prefix({1}), tombstone(11, ttl));
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1}), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
|
||||
|
||||
m.partition().apply_row_tombstone(s, make_prefix({1, 4}), tombstone(6, ttl));
|
||||
m.partition().apply_row_tombstone(*s, make_prefix({1, 4}), tombstone(6, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 2, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 3, 0})), tombstone(11, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, make_key({1, 4, 0})), tombstone(11, ttl));
|
||||
@@ -89,13 +89,13 @@ BOOST_AUTO_TEST_CASE(test_row_tombstone_updates) {
|
||||
auto ttl = gc_clock::now() + std::chrono::seconds(1);
|
||||
|
||||
mutation m(key, s);
|
||||
m.partition().apply_row_tombstone(s, c_key1_prefix, tombstone(1, ttl));
|
||||
m.partition().apply_row_tombstone(s, c_key2_prefix, tombstone(0, ttl));
|
||||
m.partition().apply_row_tombstone(*s, c_key1_prefix, tombstone(1, ttl));
|
||||
m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(0, ttl));
|
||||
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key1), tombstone(1, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(0, ttl));
|
||||
|
||||
m.partition().apply_row_tombstone(s, c_key2_prefix, tombstone(1, ttl));
|
||||
m.partition().apply_row_tombstone(*s, c_key2_prefix, tombstone(1, ttl));
|
||||
BOOST_REQUIRE_EQUAL(m.partition().tombstone_for_row(*s, c_key2), tombstone(1, ttl));
|
||||
}
|
||||
|
||||
|
||||
@@ -143,42 +143,6 @@ static atomic_cell make_atomic_cell(bytes value) {
|
||||
return atomic_cell::make_live(0, ttl_opt{}, std::move(value));
|
||||
}
|
||||
|
||||
inline bool operator==(const deletable_row& r1, const deletable_row& r2) {
|
||||
return r1.t == r2.t && r1.cells == r2.cells;
|
||||
}
|
||||
|
||||
inline bool operator==(const partition_key& m1, const partition_key& m2) {
|
||||
const bytes_view& b1 = m1;
|
||||
const bytes_view& b2 = m2;
|
||||
return b1 == b2;
|
||||
}
|
||||
|
||||
inline bool operator==(const rows_entry& r1, const rows_entry& r2) {
|
||||
return r1.row() == r2.row();
|
||||
}
|
||||
|
||||
inline bool operator!=(const rows_entry& r1, const rows_entry& r2) {
|
||||
return !(r1 == r2);
|
||||
}
|
||||
|
||||
// TODO: not complete... meh...
|
||||
inline bool operator==(const mutation_partition& cp1, const mutation_partition& cp2) {
|
||||
static schema dummy({}, "", "", {}, {}, {}, {}, utf8_type);
|
||||
auto& p1 = const_cast<mutation_partition&>(cp1);
|
||||
auto& p2 = const_cast<mutation_partition&>(cp2);
|
||||
return p1.static_row() == p2.static_row()
|
||||
&& p1.range(dummy, query::range<clustering_key_prefix>::make_open_ended_both_sides())
|
||||
== p2.range(dummy, query::range<clustering_key_prefix>::make_open_ended_both_sides())
|
||||
;
|
||||
}
|
||||
|
||||
inline bool operator==(const mutation& m1, const mutation& m2) {
|
||||
return m1.schema().get() == m2.schema().get()
|
||||
&& m1.key() == m2.key()
|
||||
&& m1.partition() == m2.partition()
|
||||
;
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_mutation){
|
||||
auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
|
||||
{{"p1", utf8_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type));
|
||||
|
||||
@@ -99,6 +99,14 @@ template<> inline bytes data_input::peek<bytes>() const {
|
||||
template<> inline size_t data_input::ssize<bytes>(const bytes & s) const {
|
||||
return sizeof(uint32_t) + s.size();
|
||||
}
|
||||
template<> inline bytes_view data_input::peek<bytes_view>() const {
|
||||
auto len = peek<uint32_t>();
|
||||
ensure(sizeof(uint32_t) + len);
|
||||
return bytes_view(_view.data() + sizeof(uint32_t), len);
|
||||
}
|
||||
template<> inline size_t data_input::ssize<bytes_view>(const bytes_view& v) const {
|
||||
return sizeof(uint32_t) + v.size();
|
||||
}
|
||||
template<> inline size_t data_input::ssize(const bool &) const {
|
||||
return sizeof(uint8_t);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user