Merge branch 'tgrabiec/cleanups' of github.com:cloudius-systems/seastar-dev into db

Database core cleanups, from Tomasz.
This commit is contained in:
Avi Kivity
2015-05-13 11:08:21 +03:00
18 changed files with 233 additions and 192 deletions

View File

@@ -30,7 +30,7 @@ namespace statements {
void delete_statement::add_update_for_key(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) {
if (_column_operations.empty()) {
m.partition().apply_delete(s, prefix, params.make_tombstone());
m.partition().apply_delete(*s, prefix, params.make_tombstone());
return;
}

View File

@@ -59,7 +59,7 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering
} else {
if (type == statement_type::INSERT && prefix) {
auto& row = m.partition().clustered_row(clustering_key::from_clustering_prefix(*s, prefix));
row.created_at = params.timestamp();
row.apply(params.timestamp());
}
}

View File

@@ -56,7 +56,7 @@ column_family::find_partition(const dht::decorated_key& key) const {
for (auto&& mt : _memtables) {
auto mp = mt.find_partition(key);
if (mp) {
ret.apply(_schema, *mp);
ret.apply(*_schema, *mp);
any = true;
}
}
@@ -157,7 +157,7 @@ column_family::for_all_partitions(Func&& func) const {
}
if (current) {
// FIXME: handle different schemas
current->second.apply(_schema, mp);
current->second.apply(*_schema, mp);
} else {
current = std::make_pair(key, mp);
}
@@ -179,7 +179,7 @@ column_family::for_all_partitions_slow(std::function<bool (const dht::decorated_
row&
column_family::find_or_create_row_slow(const partition_key& partition_key, const clustering_key& clustering_key) {
mutation_partition& p = find_or_create_partition_slow(partition_key);
return p.clustered_row(clustering_key).cells;
return p.clustered_row(clustering_key).cells();
}
class lister {
@@ -555,13 +555,13 @@ database::find_or_create_keyspace(const sstring& name) {
void
memtable::apply(const mutation& m) {
mutation_partition& p = find_or_create_partition(m.decorated_key());
p.apply(_schema, m.partition());
p.apply(*_schema, m.partition());
}
void
memtable::apply(const frozen_mutation& m) {
mutation_partition& p = find_or_create_partition_slow(m.key(*_schema));
p.apply(_schema, m.partition());
p.apply(*_schema, m.partition());
}
// Based on:
@@ -601,21 +601,6 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) {
return 0;
}
void
merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww) {
if (def.is_atomic()) {
if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) {
// FIXME: move()?
old = neww;
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation());
}
}
future<lw_shared_ptr<query::result>>
column_family::query(const query::read_command& cmd) const {
query::result::builder builder(cmd.slice);

View File

@@ -118,9 +118,7 @@ protected:
}
public:
static TopLevel make_empty(const schema& s) {
std::vector<bytes> v;
v.resize(get_compound_type(s)->types().size());
return from_exploded(s, v);
return from_exploded(s, {});
}
static TopLevel from_exploded(const schema& s, const std::vector<bytes>& v) {

View File

@@ -17,7 +17,7 @@ mutation::mutation(partition_key key_, schema_ptr schema)
{ }
void mutation::set_static_cell(const column_definition& def, atomic_cell_or_collection value) {
update_column(_p.static_row(), def, std::move(value));
_p.static_row().apply(def, std::move(value));
}
void mutation::set_static_cell(const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl) {
@@ -28,12 +28,12 @@ void mutation::set_static_cell(const bytes& name, const boost::any& value, api::
if (!column_def->is_static()) {
throw std::runtime_error(sprint("column '%s' is not static", name));
}
update_column(_p.static_row(), *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
_p.static_row().apply(*column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl));
}
void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) {
auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells;
update_column(row, def, std::move(value));
auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells();
row.apply(def, std::move(value));
}
void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value,
@@ -46,8 +46,8 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name,
}
void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) {
auto& row = _p.clustered_row(key).cells;
update_column(row, def, std::move(value));
auto& row = _p.clustered_row(key).cells();
row.apply(def, std::move(value));
}
void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value,
@@ -71,32 +71,19 @@ void mutation::set_cell(const exploded_clustering_prefix& prefix, const column_d
std::experimental::optional<atomic_cell_or_collection>
mutation::get_cell(const clustering_key& rkey, const column_definition& def) const {
auto find_cell = [&def] (const row& r) {
auto i = r.find(def.id);
if (i == r.end()) {
return std::experimental::optional<atomic_cell_or_collection>{};
}
return std::experimental::optional<atomic_cell_or_collection>{i->second};
};
if (def.is_static()) {
return find_cell(_p.static_row());
const atomic_cell_or_collection* cell = _p.static_row().find_cell(def.id);
if (!cell) {
return {};
}
return { *cell };
} else {
auto r = _p.find_row(rkey);
const row* r = _p.find_row(rkey);
if (!r) {
return {};
}
return find_cell(*r);
}
}
void mutation::update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) {
// our mutations are not yet immutable
auto id = def.id;
auto i = row.lower_bound(id);
if (i == row.end() || i->first != id) {
row.emplace_hint(i, id, std::move(value));
} else {
merge_column(def, i->second, value);
const atomic_cell_or_collection* cell = r->find_cell(def.id);
return { *cell };
}
}

View File

@@ -39,6 +39,5 @@ public:
bool operator==(const mutation&) const;
bool operator!=(const mutation&) const;
private:
static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value);
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};

View File

@@ -28,50 +28,42 @@ mutation_partition::operator=(const mutation_partition& x) {
}
void
mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
mutation_partition::apply(const schema& schema, const mutation_partition& p) {
_tombstone.apply(p._tombstone);
for (auto&& e : p._row_tombstones) {
apply_row_tombstone(*schema, e.prefix(), e.t());
apply_row_tombstone(schema, e.prefix(), e.t());
}
auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) {
static auto merge_cells = [] (row& old_row, const row& new_row, auto&& find_column_def) {
for (auto&& new_column : new_row) {
auto col = new_column.first;
auto i = old_row.find(col);
if (i == old_row.end()) {
old_row.emplace_hint(i, new_column);
} else {
auto& old_column = *i;
auto& def = find_column_def(col);
merge_column(def, old_column.second, new_column.second);
}
old_row.apply(new_column.first, new_column.second, find_column_def);
}
};
auto find_static_column_def = [schema] (auto col) -> const column_definition& { return schema->static_column_at(col); };
auto find_regular_column_def = [schema] (auto col) -> const column_definition& { return schema->regular_column_at(col); };
auto find_static_column_def = [&schema] (auto col) -> const column_definition& { return schema.static_column_at(col); };
auto find_regular_column_def = [&schema] (auto col) -> const column_definition& { return schema.regular_column_at(col); };
merge_cells(_static_row, p._static_row, find_static_column_def);
for (auto&& entry : p._rows) {
auto& key = entry.key();
auto i = _rows.find(key, rows_entry::compare(*schema));
auto i = _rows.find(key, rows_entry::compare(schema));
if (i == _rows.end()) {
auto e = new rows_entry(entry);
_rows.insert(i, *e);
} else {
i->row().t.apply(entry.row().t);
i->row().created_at = std::max(i->row().created_at, entry.row().created_at);
merge_cells(i->row().cells, entry.row().cells, find_regular_column_def);
i->row().apply(entry.row().deleted_at());
i->row().apply(entry.row().created_at());
merge_cells(i->row().cells(), entry.row().cells(), find_regular_column_def);
}
}
}
void
mutation_partition::apply(schema_ptr schema, mutation_partition_view p) {
mutation_partition_applier applier(*schema, *this);
p.accept(*schema, applier);
mutation_partition::apply(const schema& schema, mutation_partition_view p) {
mutation_partition_applier applier(schema, *this);
p.accept(schema, applier);
}
tombstone
@@ -102,7 +94,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
auto j = _rows.find(key, rows_entry::compare(schema));
if (j != _rows.end()) {
t.apply(j->row().t);
t.apply(j->row().deleted_at());
}
return t;
@@ -111,7 +103,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key
tombstone
mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const {
tombstone t = range_tombstone_for_row(schema, e.key());
t.apply(e.row().t);
t.apply(e.row().deleted_at());
return t;
}
@@ -128,24 +120,24 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre
}
void
mutation_partition::apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t) {
mutation_partition::apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t) {
if (!prefix) {
apply(t);
} else if (prefix.is_full(*schema)) {
apply_delete(schema, clustering_key::from_clustering_prefix(*schema, prefix), t);
} else if (prefix.is_full(schema)) {
apply_delete(schema, clustering_key::from_clustering_prefix(schema, prefix), t);
} else {
apply_row_tombstone(*schema, clustering_key_prefix::from_clustering_prefix(*schema, prefix), t);
apply_row_tombstone(schema, clustering_key_prefix::from_clustering_prefix(schema, prefix), t);
}
}
void
mutation_partition::apply_delete(schema_ptr schema, clustering_key&& key, tombstone t) {
clustered_row(*schema, std::move(key)).apply(t);
mutation_partition::apply_delete(const schema& schema, clustering_key&& key, tombstone t) {
clustered_row(schema, std::move(key)).apply(t);
}
void
mutation_partition::apply_delete(schema_ptr schema, clustering_key_view key, tombstone t) {
clustered_row(*schema, key).apply(t);
mutation_partition::apply_delete(const schema& schema, clustering_key_view key, tombstone t) {
clustered_row(schema, key).apply(t);
}
void
@@ -154,8 +146,8 @@ mutation_partition::apply_insert(const schema& s, clustering_key_view key, api::
}
const rows_entry*
mutation_partition::find_entry(schema_ptr schema, const clustering_key_prefix& key) const {
auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(*schema)));
mutation_partition::find_entry(const schema& schema, const clustering_key_prefix& key) const {
auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(schema)));
if (i == _rows.end()) {
return nullptr;
}
@@ -168,7 +160,7 @@ mutation_partition::find_row(const clustering_key& key) const {
if (i == _rows.end()) {
return nullptr;
}
return &i->row().cells;
return &i->row().cells();
}
deletable_row&
@@ -228,22 +220,22 @@ template <typename ColumnDefResolver>
static void get_row_slice(const row& cells, const std::vector<column_id>& columns, tombstone tomb,
ColumnDefResolver&& id_to_def, query::result::row_writer& writer) {
for (auto id : columns) {
auto i = cells.find(id);
if (i == cells.end()) {
const atomic_cell_or_collection* cell = cells.find_cell(id);
if (!cell) {
writer.add_empty();
} else {
auto&& def = id_to_def(id);
if (def.is_atomic()) {
auto c = i->second.as_atomic_cell();
auto c = cell->as_atomic_cell();
if (!c.is_live(tomb)) {
writer.add_empty();
} else {
writer.add(i->second.as_atomic_cell());
writer.add(cell->as_atomic_cell());
}
} else {
auto&& cell = i->second.as_collection_mutation();
auto&& mut = cell->as_collection_mutation();
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
auto m_view = ctype->deserialize_mutation_form(cell);
auto m_view = ctype->deserialize_mutation_form(mut);
m_view.tomb.apply(tomb);
auto m_ser = ctype->serialize_mutation_form_only_live(m_view);
if (ctype->is_empty(m_ser)) {
@@ -311,10 +303,10 @@ mutation_partition::query(const schema& s,
// only one lookup for a full-tuple singular range though.
for (const rows_entry& e : range(s, row_range)) {
auto& row = e.row();
auto&& cells = row.cells;
auto&& cells = row.cells();
auto row_tombstone = tombstone_for_row(s, e);
auto row_is_live = row.created_at > row_tombstone.timestamp;
auto row_is_live = row.created_at() > row_tombstone.timestamp;
// row_is_live is true for rows created using 'insert' statement
// which are not deleted yet. Such rows are considered as present
@@ -348,7 +340,7 @@ operator<<(std::ostream& os, const row& r) {
std::ostream&
operator<<(std::ostream& os, const deletable_row& dr) {
return fprint(os, "{deletable_row: %s %s %s}", dr.created_at, dr.t, dr.cells);
return fprint(os, "{deletable_row: %s %s %s}", dr._created_at, dr._deleted_at, dr._cells);
}
std::ostream&
@@ -378,10 +370,10 @@ rows_equal(const schema& s, const row& r1, const row& r2) {
bool
deletable_row::equal(const schema& s, const deletable_row& other) const {
if (t != other.t || created_at != other.created_at) {
if (_deleted_at != other._deleted_at || _created_at != other._created_at) {
return false;
}
return rows_equal(s, cells, other.cells);
return rows_equal(s, _cells, other._cells);
}
bool
@@ -414,3 +406,44 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con
return rows_equal(s, _static_row, p._static_row);
}
void
merge_column(const column_definition& def,
atomic_cell_or_collection& old,
const atomic_cell_or_collection& neww) {
if (def.is_atomic()) {
if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) {
// FIXME: move()?
old = neww;
}
} else {
auto ct = static_pointer_cast<const collection_type_impl>(def.type);
old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation());
}
}
void
row::apply(const column_definition& column, atomic_cell_or_collection value) {
// our mutations are not yet immutable
auto id = column.id;
auto i = _cells.lower_bound(id);
if (i == _cells.end() || i->first != id) {
_cells.emplace_hint(i, id, std::move(value));
} else {
merge_column(column, i->second, value);
}
}
void
row::append_cell(column_id id, atomic_cell_or_collection value) {
_cells.emplace_hint(_cells.end(), id, std::move(value));
}
const atomic_cell_or_collection*
row::find_cell(column_id id) const {
auto i = _cells.find(id);
if (i == _cells.end()) {
return nullptr;
}
return &i->second;
}

View File

@@ -15,28 +15,70 @@
#include "query-result-writer.hh"
#include "mutation_partition_view.hh"
// FIXME: Encapsulate
using row = std::map<column_id, atomic_cell_or_collection>;
// Container for cells of a row. Cells are identified by column_id.
//
// Can be used as a range of std::pair<column_id, atomic_cell_or_collection>.
//
class row {
using map_type = std::map<column_id, atomic_cell_or_collection>;
map_type _cells;
public:
using value_type = map_type::value_type;
using iterator = map_type::iterator;
using const_iterator = map_type::const_iterator;
public:
iterator begin() { return _cells.begin(); }
iterator end() { return _cells.end(); }
const_iterator begin() const { return _cells.begin(); }
const_iterator end() const { return _cells.end(); }
size_t size() const { return _cells.size(); }
// Returns a reference to cell's value or throws std::out_of_range
const atomic_cell_or_collection& cell_at(column_id id) const { return _cells.at(id); }
// Returns a pointer to cell's value or nullptr if column is not set.
const atomic_cell_or_collection* find_cell(column_id id) const;
public:
// Merges cell's value into the row.
void apply(const column_definition& column, atomic_cell_or_collection cell);
// Adds cell to the row. The column must not be already set.
void append_cell(column_id id, atomic_cell_or_collection cell);
// Merges given cell into the row.
template <typename ColumnDefinitionResolver>
void apply(column_id id, atomic_cell_or_collection cell, ColumnDefinitionResolver&& resolver) {
auto i = _cells.lower_bound(id);
if (i == _cells.end() || i->first != id) {
_cells.emplace_hint(i, id, std::move(cell));
} else {
merge_column(resolver(id), i->second, std::move(cell));
}
}
};
std::ostream& operator<<(std::ostream& os, const row::value_type& rv);
std::ostream& operator<<(std::ostream& os, const row& r);
// FIXME: Encapsulate
struct deletable_row final {
tombstone t;
api::timestamp_type created_at = api::missing_timestamp;
row cells;
class deletable_row final {
tombstone _deleted_at;
api::timestamp_type _created_at = api::missing_timestamp;
row _cells;
public:
deletable_row() {}
void apply(tombstone t_) {
t.apply(t_);
void apply(tombstone deleted_at) {
_deleted_at.apply(deleted_at);
}
void apply(api::timestamp_type new_created_at) {
if (new_created_at > created_at) {
created_at = new_created_at;
}
void apply(api::timestamp_type created_at) {
_created_at = std::max(_created_at, created_at);
}
public:
tombstone deleted_at() const { return _deleted_at; }
api::timestamp_type created_at() const { return _created_at; }
const row& cells() const { return _cells; }
row& cells() { return _cells; }
friend std::ostream& operator<<(std::ostream& os, const deletable_row& dr);
bool equal(const schema& s, const deletable_row& other) const;
};
@@ -200,34 +242,36 @@ public:
~mutation_partition();
mutation_partition& operator=(const mutation_partition& x);
mutation_partition& operator=(mutation_partition&& x) = default;
tombstone partition_tombstone() const { return _tombstone; }
bool equal(const schema& s, const mutation_partition&) const;
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
public:
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t);
void apply_delete(schema_ptr schema, clustering_key&& key, tombstone t);
void apply_delete(schema_ptr schema, clustering_key_view key, tombstone t);
void apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t);
void apply_delete(const schema& schema, clustering_key&& key, tombstone t);
void apply_delete(const schema& schema, clustering_key_view key, tombstone t);
// Equivalent to applying a mutation with an empty row, created with given timestamp
void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at);
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply(schema_ptr schema, const mutation_partition& p);
void apply(schema_ptr schema, mutation_partition_view);
row& static_row() { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const { return _rows; }
const row& static_row() const { return _static_row; }
const row_tombstones_type& row_tombstones() const { return _row_tombstones; }
void apply(const schema& schema, const mutation_partition& p);
void apply(const schema& schema, mutation_partition_view);
public:
deletable_row& clustered_row(const clustering_key& key);
deletable_row& clustered_row(clustering_key&& key);
deletable_row& clustered_row(const schema& s, const clustering_key_view& key);
public:
tombstone partition_tombstone() const { return _tombstone; }
row& static_row() { return _static_row; }
const row& static_row() const { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const { return _rows; }
const row_tombstones_type& row_tombstones() const { return _row_tombstones; }
const row* find_row(const clustering_key& key) const;
const rows_entry* find_entry(schema_ptr schema, const clustering_key_prefix& key) const;
const rows_entry* find_entry(const schema& schema, const clustering_key_prefix& key) const;
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
boost::iterator_range<rows_type::const_iterator> range(const schema& schema, const query::range<clustering_key_prefix>& r) const;
// Returns at most "limit" rows. The limit must be greater than 0.
void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const;
public:
bool equal(const schema& s, const mutation_partition&) const;
};

View File

@@ -13,16 +13,6 @@ class mutation_partition_applier : public mutation_partition_visitor {
const schema& _schema;
mutation_partition& _p;
deletable_row* _current_row;
private:
template <typename ColumnIdResolver>
void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) {
auto i = r.lower_bound(id);
if (i == r.end() || i->first != id) {
r.emplace_hint(i, id, std::move(c));
} else {
merge_column(resolver(id), i->second, std::move(c));
}
}
public:
mutation_partition_applier(const schema& s, mutation_partition& target)
: _schema(s), _p(target) { }
@@ -32,12 +22,12 @@ public:
}
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(cell),
_p._static_row.apply(id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}
virtual void accept_static_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(collection),
_p._static_row.apply(id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}
@@ -53,12 +43,12 @@ public:
}
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell),
_current_row->cells().apply(id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}
virtual void accept_row_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection),
_current_row->cells().apply(id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}
};

View File

@@ -53,9 +53,9 @@ mutation_partition_serializer::size(const schema& schema, const mutation_partiti
for (const rows_entry& e : p.clustered_rows()) {
size += clustering_key_view_serializer(e.key()).size();
size += sizeof(api::timestamp_type); // e.row().created_at
size += tombstone_serializer(e.row().t).size();
size += tombstone_serializer(e.row().deleted_at()).size();
size += sizeof(count_type); // e.row().cells.size()
for (auto&& cell_entry : e.row().cells) {
for (auto&& cell_entry : e.row().cells()) {
size += sizeof(column_id);
const column_definition& def = schema.regular_column_at(cell_entry.first);
if (def.is_atomic()) {
@@ -96,10 +96,10 @@ mutation_partition_serializer::write(data_output& out, const schema& schema, con
// rows
for (const rows_entry& e : p.clustered_rows()) {
clustering_key_view_serializer::write(out, e.key());
out.write(e.row().created_at);
tombstone_serializer::write(out, e.row().t);
out.write<count_type>(e.row().cells.size());
for (auto&& cell_entry : e.row().cells) {
out.write(e.row().created_at());
tombstone_serializer::write(out, e.row().deleted_at());
out.write<count_type>(e.row().cells().size());
for (auto&& cell_entry : e.row().cells()) {
out.write(cell_entry.first);
const column_definition& def = schema.regular_column_at(cell_entry.first);
if (def.is_atomic()) {

View File

@@ -27,12 +27,12 @@ public:
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
row& r = _partition.static_row();
r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell));
r.append_cell(id, atomic_cell_or_collection(cell));
}
virtual void accept_static_cell(column_id id, collection_mutation::view collection) override {
row& r = _partition.static_row();
r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection));
r.append_cell(id, atomic_cell_or_collection(collection));
}
virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override {
@@ -41,18 +41,18 @@ public:
virtual void accept_row(clustering_key_view key, api::timestamp_type created_at, tombstone deleted_at) override {
deletable_row& r = _partition.clustered_row(_schema, key);
r.created_at = created_at;
r.apply(created_at);
r.apply(deleted_at);
_current_row = &r;
}
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
row& r = _current_row->cells;
r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell));
row& r = _current_row->cells();
r.append_cell(id, atomic_cell_or_collection(cell));
}
virtual void accept_row_cell(column_id id, collection_mutation::view collection) override {
row& r = _current_row->cells;
r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection));
row& r = _current_row->cells();
r.append_cell(id, atomic_cell_or_collection(collection));
}
};

View File

@@ -119,6 +119,8 @@ std::ostream& operator<<(std::ostream& out, const range<U>& r) {
using partition_range = range<partition_key>;
using clustering_range = range<clustering_key_prefix>;
// Specifies subset of rows, columns and cell attributes to be returned in a query.
// Can be accessed across cores.
class partition_slice {
public:
enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry };
@@ -142,6 +144,9 @@ public:
friend std::ostream& operator<<(std::ostream& out, const partition_slice& ps);
};
// Full specification of a query to the database.
// Intended for passing across replicas.
// Can be accessed across cores.
class read_command {
public:
utils::UUID cf_id;

View File

@@ -184,7 +184,7 @@ public:
if (col.cell.size() == 0) {
auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix);
auto& dr = mut->partition().clustered_row(clustering_key);
dr.created_at = timestamp;
dr.apply(timestamp);
return;
}

View File

@@ -933,14 +933,14 @@ static future<> write_cell(output_stream<char>& out, atomic_cell_view cell) {
static future<> write_row_marker(output_stream<char>& out, const rows_entry& clustered_row, bytes& clustering_key) {
// Missing created_at (api::missing_timestamp) means no row marker.
if (clustered_row.row().created_at == api::missing_timestamp) {
if (clustered_row.row().created_at() == api::missing_timestamp) {
return make_ready_future<>();
}
// Write row mark cell to the beginning of clustered row.
return write_column_name(out, clustering_key, {}).then([&out, &clustered_row] {
column_mask mask = column_mask::none;
uint64_t timestamp = clustered_row.row().created_at;
uint64_t timestamp = clustered_row.row().created_at();
uint32_t value_length = 0;
return write(out, mask, timestamp, value_length);
@@ -956,10 +956,10 @@ static future<> write_clustered_row(output_stream<char>& out, schema_ptr schema,
return write_row_marker(out, clustered_row, clustering_key).then(
[&out, &clustered_row, schema, &clustering_key] {
// FIXME: Before writing cells, range tombstone must be written if the row has any (deletable_row::t).
assert(!clustered_row.row().t);
assert(!clustered_row.row().deleted_at());
// Write all cells of a partition's row.
return do_for_each(clustered_row.row().cells, [&out, schema, &clustering_key] (auto& value) {
return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key] (auto& value) {
auto column_id = value.first;
auto&& column_definition = schema->regular_column_at(column_id);
// non atomic cell isn't supported yet. atomic cell maps to a single trift cell.

View File

@@ -136,19 +136,19 @@ public:
assert(row != nullptr);
auto col_def = schema->get_column_definition(utf8_type->decompose(column_name));
assert(col_def != nullptr);
auto i = row->find(col_def->id);
if (i == row->end()) {
const atomic_cell_or_collection* cell = row->find_cell(col_def->id);
if (!cell) {
assert(((void)"column not set", 0));
}
bytes actual;
if (!col_def->type->is_multi_cell()) {
auto cell = i->second.as_atomic_cell();
assert(cell.is_live());
actual = { cell.value().begin(), cell.value().end() };
auto c = cell->as_atomic_cell();
assert(c.is_live());
actual = { c.value().begin(), c.value().end() };
} else {
auto cell = i->second.as_collection_mutation();
auto c = cell->as_collection_mutation();
auto type = dynamic_pointer_cast<const collection_type_impl>(col_def->type);
actual = type->to_value(type->deserialize_mutation_form(cell),
actual = type->to_value(type->deserialize_mutation_form(c),
serialization_format::internal());
}
assert(col_def->type->equal(actual, col_def->type->decompose(expected)));

View File

@@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(test_writing_and_reading) {
test_freezing(m);
m.partition().apply_delete(s, ck2, new_tombstone());
m.partition().apply_delete(*s, ck2, new_tombstone());
test_freezing(m);
@@ -107,16 +107,16 @@ BOOST_AUTO_TEST_CASE(test_application_of_partition_view_has_the_same_effect_as_a
m2.set_static_cell("static_1", bytes("val5"), new_timestamp());
mutation m_frozen(key, s);
m_frozen.partition().apply(s, freeze(m1).partition());
m_frozen.partition().apply(s, freeze(m2).partition());
m_frozen.partition().apply(*s, freeze(m1).partition());
m_frozen.partition().apply(*s, freeze(m2).partition());
mutation m_unfrozen(key, s);
m_unfrozen.partition().apply(s, m1.partition());
m_unfrozen.partition().apply(s, m2.partition());
m_unfrozen.partition().apply(*s, m1.partition());
m_unfrozen.partition().apply(*s, m2.partition());
mutation m_refrozen(key, s);
m_refrozen.partition().apply(s, freeze(m1).unfreeze(s).partition());
m_refrozen.partition().apply(s, freeze(m2).unfreeze(s).partition());
m_refrozen.partition().apply(*s, freeze(m1).unfreeze(s).partition());
m_refrozen.partition().apply(*s, freeze(m2).unfreeze(s).partition());
assert_that(m_unfrozen).is_equal_to(m_refrozen);
assert_that(m_unfrozen).is_equal_to(m_frozen);

View File

@@ -33,9 +33,9 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) {
cf.apply(std::move(m));
row& r = cf.find_or_create_row_slow(key, c_key);
auto i = r.find(r1_col.id);
BOOST_REQUIRE(i != r.end());
auto cell = i->second.as_atomic_cell();
auto i = r.find_cell(r1_col.id);
BOOST_REQUIRE(i);
auto cell = i->as_atomic_cell();
BOOST_REQUIRE(cell.is_live());
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3)));
}
@@ -124,9 +124,9 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) {
cf.apply(m2o);
row& r = cf.find_or_create_partition_slow(key).static_row();
auto i = r.find(column.id);
BOOST_REQUIRE(i != r.end());
auto cell = i->second.as_collection_mutation();
auto i = r.find_cell(column.id);
BOOST_REQUIRE(i);
auto cell = i->as_collection_mutation();
auto muts = my_map_type->deserialize_mutation_form(cell);
BOOST_REQUIRE(muts.cells.size() == 3);
// FIXME: more strict tests
@@ -157,9 +157,9 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) {
cf.apply(m2o);
row& r = cf.find_or_create_partition_slow(key).static_row();
auto i = r.find(column.id);
BOOST_REQUIRE(i != r.end());
auto cell = i->second.as_collection_mutation();
auto i = r.find_cell(column.id);
BOOST_REQUIRE(i);
auto cell = i->as_collection_mutation();
auto muts = my_set_type->deserialize_mutation_form(cell);
BOOST_REQUIRE(muts.cells.size() == 3);
// FIXME: more strict tests
@@ -191,9 +191,9 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) {
cf.apply(m2o);
row& r = cf.find_or_create_partition_slow(key).static_row();
auto i = r.find(column.id);
BOOST_REQUIRE(i != r.end());
auto cell = i->second.as_collection_mutation();
auto i = r.find_cell(column.id);
BOOST_REQUIRE(i);
auto cell = i->as_collection_mutation();
auto muts = my_list_type->deserialize_mutation_form(cell);
BOOST_REQUIRE(muts.cells.size() == 4);
// FIXME: more strict tests
@@ -223,9 +223,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_one_partition) {
auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)});
auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key);
BOOST_REQUIRE(r);
auto i = r->find(r1_col.id);
BOOST_REQUIRE(i != r->end());
auto cell = i->second.as_atomic_cell();
auto i = r->find_cell(r1_col.id);
BOOST_REQUIRE(i);
auto cell = i->as_atomic_cell();
BOOST_REQUIRE(cell.is_live());
BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1)));
};
@@ -267,9 +267,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) {
auto p1 = boost::any_cast<int32_t>(int32_type->deserialize(pk._key.explode(*s)[0]));
for (const rows_entry& re : mp.range(*s, query::range<clustering_key_prefix>())) {
auto c1 = boost::any_cast<int32_t>(int32_type->deserialize(re.key().explode(*s)[0]));
auto i = re.row().cells.find(r1_col.id);
if (i != re.row().cells.end()) {
result[p1][c1] = boost::any_cast<int32_t>(int32_type->deserialize(i->second.as_atomic_cell().value()));
auto cell = re.row().cells().find_cell(r1_col.id);
if (cell) {
result[p1][c1] = boost::any_cast<int32_t>(int32_type->deserialize(cell->as_atomic_cell().value()));
}
}
return true;

View File

@@ -148,7 +148,7 @@ public:
// FIXME: force limit count?
while (beg != end && count--) {
const column_definition& def = range.reversed ? *--end : *beg++;
atomic_cell_view cell = (*rw).at(def.id).as_atomic_cell();
atomic_cell_view cell = (*rw).cell_at(def.id).as_atomic_cell();
if (def.is_atomic()) {
if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels
Column col;