diff --git a/cql3/statements/delete_statement.cc b/cql3/statements/delete_statement.cc index 4ed0304a87..a0976ae6e2 100644 --- a/cql3/statements/delete_statement.cc +++ b/cql3/statements/delete_statement.cc @@ -30,7 +30,7 @@ namespace statements { void delete_statement::add_update_for_key(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) { if (_column_operations.empty()) { - m.partition().apply_delete(s, prefix, params.make_tombstone()); + m.partition().apply_delete(*s, prefix, params.make_tombstone()); return; } diff --git a/cql3/statements/update_statement.cc b/cql3/statements/update_statement.cc index a4ddd40a0f..84271786e5 100644 --- a/cql3/statements/update_statement.cc +++ b/cql3/statements/update_statement.cc @@ -59,7 +59,7 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering } else { if (type == statement_type::INSERT && prefix) { auto& row = m.partition().clustered_row(clustering_key::from_clustering_prefix(*s, prefix)); - row.created_at = params.timestamp(); + row.apply(params.timestamp()); } } diff --git a/database.cc b/database.cc index 19b0fa14d4..f8d373aa92 100644 --- a/database.cc +++ b/database.cc @@ -56,7 +56,7 @@ column_family::find_partition(const dht::decorated_key& key) const { for (auto&& mt : _memtables) { auto mp = mt.find_partition(key); if (mp) { - ret.apply(_schema, *mp); + ret.apply(*_schema, *mp); any = true; } } @@ -157,7 +157,7 @@ column_family::for_all_partitions(Func&& func) const { } if (current) { // FIXME: handle different schemas - current->second.apply(_schema, mp); + current->second.apply(*_schema, mp); } else { current = std::make_pair(key, mp); } @@ -179,7 +179,7 @@ column_family::for_all_partitions_slow(std::function(def.type); - old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); - } -} - future> column_family::query(const query::read_command& cmd) const { query::result::builder builder(cmd.slice); diff --git a/keys.hh b/keys.hh index b815d8ffb0..38583ae1b5 100644 --- a/keys.hh +++ b/keys.hh @@ -118,9 +118,7 @@ protected: } public: static TopLevel make_empty(const schema& s) { - std::vector v; - v.resize(get_compound_type(s)->types().size()); - return from_exploded(s, v); + return from_exploded(s, {}); } static TopLevel from_exploded(const schema& s, const std::vector& v) { diff --git a/mutation.cc b/mutation.cc index 5273c638e5..bb82b5fac8 100644 --- a/mutation.cc +++ b/mutation.cc @@ -17,7 +17,7 @@ mutation::mutation(partition_key key_, schema_ptr schema) { } void mutation::set_static_cell(const column_definition& def, atomic_cell_or_collection value) { - update_column(_p.static_row(), def, std::move(value)); + _p.static_row().apply(def, std::move(value)); } void mutation::set_static_cell(const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl) { @@ -28,12 +28,12 @@ void mutation::set_static_cell(const bytes& name, const boost::any& value, api:: if (!column_def->is_static()) { throw std::runtime_error(sprint("column '%s' is not static", name)); } - update_column(_p.static_row(), *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl)); + _p.static_row().apply(*column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl)); } void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { - auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells; - update_column(row, def, std::move(value)); + auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells(); + row.apply(def, std::move(value)); } void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value, @@ -46,8 +46,8 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, } void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { - auto& row = _p.clustered_row(key).cells; - update_column(row, def, std::move(value)); + auto& row = _p.clustered_row(key).cells(); + row.apply(def, std::move(value)); } void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value, @@ -71,32 +71,19 @@ void mutation::set_cell(const exploded_clustering_prefix& prefix, const column_d std::experimental::optional mutation::get_cell(const clustering_key& rkey, const column_definition& def) const { - auto find_cell = [&def] (const row& r) { - auto i = r.find(def.id); - if (i == r.end()) { - return std::experimental::optional{}; - } - return std::experimental::optional{i->second}; - }; if (def.is_static()) { - return find_cell(_p.static_row()); + const atomic_cell_or_collection* cell = _p.static_row().find_cell(def.id); + if (!cell) { + return {}; + } + return { *cell }; } else { - auto r = _p.find_row(rkey); + const row* r = _p.find_row(rkey); if (!r) { return {}; } - return find_cell(*r); - } -} - -void mutation::update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) { - // our mutations are not yet immutable - auto id = def.id; - auto i = row.lower_bound(id); - if (i == row.end() || i->first != id) { - row.emplace_hint(i, id, std::move(value)); - } else { - merge_column(def, i->second, value); + const atomic_cell_or_collection* cell = r->find_cell(def.id); + return { *cell }; } } diff --git a/mutation.hh b/mutation.hh index 7b844fac26..fa0a8d7654 100644 --- a/mutation.hh +++ b/mutation.hh @@ -39,6 +39,5 @@ public: bool operator==(const mutation&) const; bool operator!=(const mutation&) const; private: - static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value); friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; diff --git a/mutation_partition.cc b/mutation_partition.cc index b4427e1e8c..ea7b693007 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -28,50 +28,42 @@ mutation_partition::operator=(const mutation_partition& x) { } void -mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { +mutation_partition::apply(const schema& schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); for (auto&& e : p._row_tombstones) { - apply_row_tombstone(*schema, e.prefix(), e.t()); + apply_row_tombstone(schema, e.prefix(), e.t()); } - auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { + static auto merge_cells = [] (row& old_row, const row& new_row, auto&& find_column_def) { for (auto&& new_column : new_row) { - auto col = new_column.first; - auto i = old_row.find(col); - if (i == old_row.end()) { - old_row.emplace_hint(i, new_column); - } else { - auto& old_column = *i; - auto& def = find_column_def(col); - merge_column(def, old_column.second, new_column.second); - } + old_row.apply(new_column.first, new_column.second, find_column_def); } }; - auto find_static_column_def = [schema] (auto col) -> const column_definition& { return schema->static_column_at(col); }; - auto find_regular_column_def = [schema] (auto col) -> const column_definition& { return schema->regular_column_at(col); }; + auto find_static_column_def = [&schema] (auto col) -> const column_definition& { return schema.static_column_at(col); }; + auto find_regular_column_def = [&schema] (auto col) -> const column_definition& { return schema.regular_column_at(col); }; merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { auto& key = entry.key(); - auto i = _rows.find(key, rows_entry::compare(*schema)); + auto i = _rows.find(key, rows_entry::compare(schema)); if (i == _rows.end()) { auto e = new rows_entry(entry); _rows.insert(i, *e); } else { - i->row().t.apply(entry.row().t); - i->row().created_at = std::max(i->row().created_at, entry.row().created_at); - merge_cells(i->row().cells, entry.row().cells, find_regular_column_def); + i->row().apply(entry.row().deleted_at()); + i->row().apply(entry.row().created_at()); + merge_cells(i->row().cells(), entry.row().cells(), find_regular_column_def); } } } void -mutation_partition::apply(schema_ptr schema, mutation_partition_view p) { - mutation_partition_applier applier(*schema, *this); - p.accept(*schema, applier); +mutation_partition::apply(const schema& schema, mutation_partition_view p) { + mutation_partition_applier applier(schema, *this); + p.accept(schema, applier); } tombstone @@ -102,7 +94,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key auto j = _rows.find(key, rows_entry::compare(schema)); if (j != _rows.end()) { - t.apply(j->row().t); + t.apply(j->row().deleted_at()); } return t; @@ -111,7 +103,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key tombstone mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const { tombstone t = range_tombstone_for_row(schema, e.key()); - t.apply(e.row().t); + t.apply(e.row().deleted_at()); return t; } @@ -128,24 +120,24 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre } void -mutation_partition::apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t) { +mutation_partition::apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t) { if (!prefix) { apply(t); - } else if (prefix.is_full(*schema)) { - apply_delete(schema, clustering_key::from_clustering_prefix(*schema, prefix), t); + } else if (prefix.is_full(schema)) { + apply_delete(schema, clustering_key::from_clustering_prefix(schema, prefix), t); } else { - apply_row_tombstone(*schema, clustering_key_prefix::from_clustering_prefix(*schema, prefix), t); + apply_row_tombstone(schema, clustering_key_prefix::from_clustering_prefix(schema, prefix), t); } } void -mutation_partition::apply_delete(schema_ptr schema, clustering_key&& key, tombstone t) { - clustered_row(*schema, std::move(key)).apply(t); +mutation_partition::apply_delete(const schema& schema, clustering_key&& key, tombstone t) { + clustered_row(schema, std::move(key)).apply(t); } void -mutation_partition::apply_delete(schema_ptr schema, clustering_key_view key, tombstone t) { - clustered_row(*schema, key).apply(t); +mutation_partition::apply_delete(const schema& schema, clustering_key_view key, tombstone t) { + clustered_row(schema, key).apply(t); } void @@ -154,8 +146,8 @@ mutation_partition::apply_insert(const schema& s, clustering_key_view key, api:: } const rows_entry* -mutation_partition::find_entry(schema_ptr schema, const clustering_key_prefix& key) const { - auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(*schema))); +mutation_partition::find_entry(const schema& schema, const clustering_key_prefix& key) const { + auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(schema))); if (i == _rows.end()) { return nullptr; } @@ -168,7 +160,7 @@ mutation_partition::find_row(const clustering_key& key) const { if (i == _rows.end()) { return nullptr; } - return &i->row().cells; + return &i->row().cells(); } deletable_row& @@ -228,22 +220,22 @@ template static void get_row_slice(const row& cells, const std::vector& columns, tombstone tomb, ColumnDefResolver&& id_to_def, query::result::row_writer& writer) { for (auto id : columns) { - auto i = cells.find(id); - if (i == cells.end()) { + const atomic_cell_or_collection* cell = cells.find_cell(id); + if (!cell) { writer.add_empty(); } else { auto&& def = id_to_def(id); if (def.is_atomic()) { - auto c = i->second.as_atomic_cell(); + auto c = cell->as_atomic_cell(); if (!c.is_live(tomb)) { writer.add_empty(); } else { - writer.add(i->second.as_atomic_cell()); + writer.add(cell->as_atomic_cell()); } } else { - auto&& cell = i->second.as_collection_mutation(); + auto&& mut = cell->as_collection_mutation(); auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cell); + auto m_view = ctype->deserialize_mutation_form(mut); m_view.tomb.apply(tomb); auto m_ser = ctype->serialize_mutation_form_only_live(m_view); if (ctype->is_empty(m_ser)) { @@ -311,10 +303,10 @@ mutation_partition::query(const schema& s, // only one lookup for a full-tuple singular range though. for (const rows_entry& e : range(s, row_range)) { auto& row = e.row(); - auto&& cells = row.cells; + auto&& cells = row.cells(); auto row_tombstone = tombstone_for_row(s, e); - auto row_is_live = row.created_at > row_tombstone.timestamp; + auto row_is_live = row.created_at() > row_tombstone.timestamp; // row_is_live is true for rows created using 'insert' statement // which are not deleted yet. Such rows are considered as present @@ -348,7 +340,7 @@ operator<<(std::ostream& os, const row& r) { std::ostream& operator<<(std::ostream& os, const deletable_row& dr) { - return fprint(os, "{deletable_row: %s %s %s}", dr.created_at, dr.t, dr.cells); + return fprint(os, "{deletable_row: %s %s %s}", dr._created_at, dr._deleted_at, dr._cells); } std::ostream& @@ -378,10 +370,10 @@ rows_equal(const schema& s, const row& r1, const row& r2) { bool deletable_row::equal(const schema& s, const deletable_row& other) const { - if (t != other.t || created_at != other.created_at) { + if (_deleted_at != other._deleted_at || _created_at != other._created_at) { return false; } - return rows_equal(s, cells, other.cells); + return rows_equal(s, _cells, other._cells); } bool @@ -414,3 +406,44 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con return rows_equal(s, _static_row, p._static_row); } + +void +merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww) { + if (def.is_atomic()) { + if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) { + // FIXME: move()? + old = neww; + } + } else { + auto ct = static_pointer_cast(def.type); + old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); + } +} + +void +row::apply(const column_definition& column, atomic_cell_or_collection value) { + // our mutations are not yet immutable + auto id = column.id; + auto i = _cells.lower_bound(id); + if (i == _cells.end() || i->first != id) { + _cells.emplace_hint(i, id, std::move(value)); + } else { + merge_column(column, i->second, value); + } +} + +void +row::append_cell(column_id id, atomic_cell_or_collection value) { + _cells.emplace_hint(_cells.end(), id, std::move(value)); +} + +const atomic_cell_or_collection* +row::find_cell(column_id id) const { + auto i = _cells.find(id); + if (i == _cells.end()) { + return nullptr; + } + return &i->second; +} diff --git a/mutation_partition.hh b/mutation_partition.hh index 2729c9981c..651f123c69 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -15,28 +15,70 @@ #include "query-result-writer.hh" #include "mutation_partition_view.hh" -// FIXME: Encapsulate -using row = std::map; +// Container for cells of a row. Cells are identified by column_id. +// +// Can be used as a range of std::pair. +// +class row { + using map_type = std::map; + map_type _cells; +public: + using value_type = map_type::value_type; + using iterator = map_type::iterator; + using const_iterator = map_type::const_iterator; +public: + iterator begin() { return _cells.begin(); } + iterator end() { return _cells.end(); } + const_iterator begin() const { return _cells.begin(); } + const_iterator end() const { return _cells.end(); } + size_t size() const { return _cells.size(); } + + // Returns a reference to cell's value or throws std::out_of_range + const atomic_cell_or_collection& cell_at(column_id id) const { return _cells.at(id); } + + // Returns a pointer to cell's value or nullptr if column is not set. + const atomic_cell_or_collection* find_cell(column_id id) const; +public: + // Merges cell's value into the row. + void apply(const column_definition& column, atomic_cell_or_collection cell); + + // Adds cell to the row. The column must not be already set. + void append_cell(column_id id, atomic_cell_or_collection cell); + + // Merges given cell into the row. + template + void apply(column_id id, atomic_cell_or_collection cell, ColumnDefinitionResolver&& resolver) { + auto i = _cells.lower_bound(id); + if (i == _cells.end() || i->first != id) { + _cells.emplace_hint(i, id, std::move(cell)); + } else { + merge_column(resolver(id), i->second, std::move(cell)); + } + } +}; std::ostream& operator<<(std::ostream& os, const row::value_type& rv); std::ostream& operator<<(std::ostream& os, const row& r); -// FIXME: Encapsulate -struct deletable_row final { - tombstone t; - api::timestamp_type created_at = api::missing_timestamp; - row cells; +class deletable_row final { + tombstone _deleted_at; + api::timestamp_type _created_at = api::missing_timestamp; + row _cells; +public: + deletable_row() {} - void apply(tombstone t_) { - t.apply(t_); + void apply(tombstone deleted_at) { + _deleted_at.apply(deleted_at); } - void apply(api::timestamp_type new_created_at) { - if (new_created_at > created_at) { - created_at = new_created_at; - } + void apply(api::timestamp_type created_at) { + _created_at = std::max(_created_at, created_at); } - +public: + tombstone deleted_at() const { return _deleted_at; } + api::timestamp_type created_at() const { return _created_at; } + const row& cells() const { return _cells; } + row& cells() { return _cells; } friend std::ostream& operator<<(std::ostream& os, const deletable_row& dr); bool equal(const schema& s, const deletable_row& other) const; }; @@ -200,34 +242,36 @@ public: ~mutation_partition(); mutation_partition& operator=(const mutation_partition& x); mutation_partition& operator=(mutation_partition&& x) = default; - tombstone partition_tombstone() const { return _tombstone; } + bool equal(const schema& s, const mutation_partition&) const; + friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); +public: void apply(tombstone t) { _tombstone.apply(t); } - void apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t); - void apply_delete(schema_ptr schema, clustering_key&& key, tombstone t); - void apply_delete(schema_ptr schema, clustering_key_view key, tombstone t); + void apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t); + void apply_delete(const schema& schema, clustering_key&& key, tombstone t); + void apply_delete(const schema& schema, clustering_key_view key, tombstone t); // Equivalent to applying a mutation with an empty row, created with given timestamp void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at); // prefix must not be full void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); - void apply(schema_ptr schema, const mutation_partition& p); - void apply(schema_ptr schema, mutation_partition_view); - row& static_row() { return _static_row; } - // return a set of rows_entry where each entry represents a CQL row sharing the same clustering key. - const rows_type& clustered_rows() const { return _rows; } - const row& static_row() const { return _static_row; } - const row_tombstones_type& row_tombstones() const { return _row_tombstones; } + void apply(const schema& schema, const mutation_partition& p); + void apply(const schema& schema, mutation_partition_view); +public: deletable_row& clustered_row(const clustering_key& key); deletable_row& clustered_row(clustering_key&& key); deletable_row& clustered_row(const schema& s, const clustering_key_view& key); +public: + tombstone partition_tombstone() const { return _tombstone; } + row& static_row() { return _static_row; } + const row& static_row() const { return _static_row; } + // return a set of rows_entry where each entry represents a CQL row sharing the same clustering key. + const rows_type& clustered_rows() const { return _rows; } + const row_tombstones_type& row_tombstones() const { return _row_tombstones; } const row* find_row(const clustering_key& key) const; - const rows_entry* find_entry(schema_ptr schema, const clustering_key_prefix& key) const; + const rows_entry* find_entry(const schema& schema, const clustering_key_prefix& key) const; tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const; - friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); boost::iterator_range range(const schema& schema, const query::range& r) const; // Returns at most "limit" rows. The limit must be greater than 0. void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const; -public: - bool equal(const schema& s, const mutation_partition&) const; }; diff --git a/mutation_partition_applier.hh b/mutation_partition_applier.hh index 2933020048..0a222a0853 100644 --- a/mutation_partition_applier.hh +++ b/mutation_partition_applier.hh @@ -13,16 +13,6 @@ class mutation_partition_applier : public mutation_partition_visitor { const schema& _schema; mutation_partition& _p; deletable_row* _current_row; -private: - template - void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) { - auto i = r.lower_bound(id); - if (i == r.end() || i->first != id) { - r.emplace_hint(i, id, std::move(c)); - } else { - merge_column(resolver(id), i->second, std::move(c)); - } - } public: mutation_partition_applier(const schema& s, mutation_partition& target) : _schema(s), _p(target) { } @@ -32,12 +22,12 @@ public: } virtual void accept_static_cell(column_id id, atomic_cell_view cell) override { - apply_cell(_p._static_row, id, atomic_cell_or_collection(cell), + _p._static_row.apply(id, atomic_cell_or_collection(cell), [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); } virtual void accept_static_cell(column_id id, collection_mutation::view collection) override { - apply_cell(_p._static_row, id, atomic_cell_or_collection(collection), + _p._static_row.apply(id, atomic_cell_or_collection(collection), [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); } @@ -53,12 +43,12 @@ public: } virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { - apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell), + _current_row->cells().apply(id, atomic_cell_or_collection(cell), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { - apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection), + _current_row->cells().apply(id, atomic_cell_or_collection(collection), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } }; diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc index 9ece33f182..2df70aacfe 100644 --- a/mutation_partition_serializer.cc +++ b/mutation_partition_serializer.cc @@ -53,9 +53,9 @@ mutation_partition_serializer::size(const schema& schema, const mutation_partiti for (const rows_entry& e : p.clustered_rows()) { size += clustering_key_view_serializer(e.key()).size(); size += sizeof(api::timestamp_type); // e.row().created_at - size += tombstone_serializer(e.row().t).size(); + size += tombstone_serializer(e.row().deleted_at()).size(); size += sizeof(count_type); // e.row().cells.size() - for (auto&& cell_entry : e.row().cells) { + for (auto&& cell_entry : e.row().cells()) { size += sizeof(column_id); const column_definition& def = schema.regular_column_at(cell_entry.first); if (def.is_atomic()) { @@ -96,10 +96,10 @@ mutation_partition_serializer::write(data_output& out, const schema& schema, con // rows for (const rows_entry& e : p.clustered_rows()) { clustering_key_view_serializer::write(out, e.key()); - out.write(e.row().created_at); - tombstone_serializer::write(out, e.row().t); - out.write(e.row().cells.size()); - for (auto&& cell_entry : e.row().cells) { + out.write(e.row().created_at()); + tombstone_serializer::write(out, e.row().deleted_at()); + out.write(e.row().cells().size()); + for (auto&& cell_entry : e.row().cells()) { out.write(cell_entry.first); const column_definition& def = schema.regular_column_at(cell_entry.first); if (def.is_atomic()) { diff --git a/partition_builder.hh b/partition_builder.hh index acc51f7481..1f6adece6d 100644 --- a/partition_builder.hh +++ b/partition_builder.hh @@ -27,12 +27,12 @@ public: virtual void accept_static_cell(column_id id, atomic_cell_view cell) override { row& r = _partition.static_row(); - r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell)); + r.append_cell(id, atomic_cell_or_collection(cell)); } virtual void accept_static_cell(column_id id, collection_mutation::view collection) override { row& r = _partition.static_row(); - r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection)); + r.append_cell(id, atomic_cell_or_collection(collection)); } virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override { @@ -41,18 +41,18 @@ public: virtual void accept_row(clustering_key_view key, api::timestamp_type created_at, tombstone deleted_at) override { deletable_row& r = _partition.clustered_row(_schema, key); - r.created_at = created_at; + r.apply(created_at); r.apply(deleted_at); _current_row = &r; } virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { - row& r = _current_row->cells; - r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell)); + row& r = _current_row->cells(); + r.append_cell(id, atomic_cell_or_collection(cell)); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { - row& r = _current_row->cells; - r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection)); + row& r = _current_row->cells(); + r.append_cell(id, atomic_cell_or_collection(collection)); } }; diff --git a/query-request.hh b/query-request.hh index 54151f7cc5..1811d22928 100644 --- a/query-request.hh +++ b/query-request.hh @@ -119,6 +119,8 @@ std::ostream& operator<<(std::ostream& out, const range& r) { using partition_range = range; using clustering_range = range; +// Specifies subset of rows, columns and cell attributes to be returned in a query. +// Can be accessed across cores. class partition_slice { public: enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry }; @@ -142,6 +144,9 @@ public: friend std::ostream& operator<<(std::ostream& out, const partition_slice& ps); }; +// Full specification of a query to the database. +// Intended for passing across replicas. +// Can be accessed across cores. class read_command { public: utils::UUID cf_id; diff --git a/sstables/partition.cc b/sstables/partition.cc index 345c3b8056..3a33d36d75 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -184,7 +184,7 @@ public: if (col.cell.size() == 0) { auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix); auto& dr = mut->partition().clustered_row(clustering_key); - dr.created_at = timestamp; + dr.apply(timestamp); return; } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5f3f6f0f50..31d54a04cf 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -933,14 +933,14 @@ static future<> write_cell(output_stream& out, atomic_cell_view cell) { static future<> write_row_marker(output_stream& out, const rows_entry& clustered_row, bytes& clustering_key) { // Missing created_at (api::missing_timestamp) means no row marker. - if (clustered_row.row().created_at == api::missing_timestamp) { + if (clustered_row.row().created_at() == api::missing_timestamp) { return make_ready_future<>(); } // Write row mark cell to the beginning of clustered row. return write_column_name(out, clustering_key, {}).then([&out, &clustered_row] { column_mask mask = column_mask::none; - uint64_t timestamp = clustered_row.row().created_at; + uint64_t timestamp = clustered_row.row().created_at(); uint32_t value_length = 0; return write(out, mask, timestamp, value_length); @@ -956,10 +956,10 @@ static future<> write_clustered_row(output_stream& out, schema_ptr schema, return write_row_marker(out, clustered_row, clustering_key).then( [&out, &clustered_row, schema, &clustering_key] { // FIXME: Before writing cells, range tombstone must be written if the row has any (deletable_row::t). - assert(!clustered_row.row().t); + assert(!clustered_row.row().deleted_at()); // Write all cells of a partition's row. - return do_for_each(clustered_row.row().cells, [&out, schema, &clustering_key] (auto& value) { + return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key] (auto& value) { auto column_id = value.first; auto&& column_definition = schema->regular_column_at(column_id); // non atomic cell isn't supported yet. atomic cell maps to a single trift cell. diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index e72b574508..fd31746342 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -136,19 +136,19 @@ public: assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); assert(col_def != nullptr); - auto i = row->find(col_def->id); - if (i == row->end()) { + const atomic_cell_or_collection* cell = row->find_cell(col_def->id); + if (!cell) { assert(((void)"column not set", 0)); } bytes actual; if (!col_def->type->is_multi_cell()) { - auto cell = i->second.as_atomic_cell(); - assert(cell.is_live()); - actual = { cell.value().begin(), cell.value().end() }; + auto c = cell->as_atomic_cell(); + assert(c.is_live()); + actual = { c.value().begin(), c.value().end() }; } else { - auto cell = i->second.as_collection_mutation(); + auto c = cell->as_collection_mutation(); auto type = dynamic_pointer_cast(col_def->type); - actual = type->to_value(type->deserialize_mutation_form(cell), + actual = type->to_value(type->deserialize_mutation_form(c), serialization_format::internal()); } assert(col_def->type->equal(actual, col_def->type->decompose(expected))); diff --git a/tests/urchin/frozen_mutation_test.cc b/tests/urchin/frozen_mutation_test.cc index 3ff33159f7..ed2ce757f8 100644 --- a/tests/urchin/frozen_mutation_test.cc +++ b/tests/urchin/frozen_mutation_test.cc @@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(test_writing_and_reading) { test_freezing(m); - m.partition().apply_delete(s, ck2, new_tombstone()); + m.partition().apply_delete(*s, ck2, new_tombstone()); test_freezing(m); @@ -107,16 +107,16 @@ BOOST_AUTO_TEST_CASE(test_application_of_partition_view_has_the_same_effect_as_a m2.set_static_cell("static_1", bytes("val5"), new_timestamp()); mutation m_frozen(key, s); - m_frozen.partition().apply(s, freeze(m1).partition()); - m_frozen.partition().apply(s, freeze(m2).partition()); + m_frozen.partition().apply(*s, freeze(m1).partition()); + m_frozen.partition().apply(*s, freeze(m2).partition()); mutation m_unfrozen(key, s); - m_unfrozen.partition().apply(s, m1.partition()); - m_unfrozen.partition().apply(s, m2.partition()); + m_unfrozen.partition().apply(*s, m1.partition()); + m_unfrozen.partition().apply(*s, m2.partition()); mutation m_refrozen(key, s); - m_refrozen.partition().apply(s, freeze(m1).unfreeze(s).partition()); - m_refrozen.partition().apply(s, freeze(m2).unfreeze(s).partition()); + m_refrozen.partition().apply(*s, freeze(m1).unfreeze(s).partition()); + m_refrozen.partition().apply(*s, freeze(m2).unfreeze(s).partition()); assert_that(m_unfrozen).is_equal_to(m_refrozen); assert_that(m_unfrozen).is_equal_to(m_frozen); diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index bab9128100..6ab6fdfd4e 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -33,9 +33,9 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { cf.apply(std::move(m)); row& r = cf.find_or_create_row_slow(key, c_key); - auto i = r.find(r1_col.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_atomic_cell(); + auto i = r.find_cell(r1_col.id); + BOOST_REQUIRE(i); + auto cell = i->as_atomic_cell(); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); } @@ -124,9 +124,9 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_map_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests @@ -157,9 +157,9 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_set_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests @@ -191,9 +191,9 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_list_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 4); // FIXME: more strict tests @@ -223,9 +223,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_one_partition) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key); BOOST_REQUIRE(r); - auto i = r->find(r1_col.id); - BOOST_REQUIRE(i != r->end()); - auto cell = i->second.as_atomic_cell(); + auto i = r->find_cell(r1_col.id); + BOOST_REQUIRE(i); + auto cell = i->as_atomic_cell(); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1))); }; @@ -267,9 +267,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto p1 = boost::any_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = boost::any_cast(int32_type->deserialize(re.key().explode(*s)[0])); - auto i = re.row().cells.find(r1_col.id); - if (i != re.row().cells.end()) { - result[p1][c1] = boost::any_cast(int32_type->deserialize(i->second.as_atomic_cell().value())); + auto cell = re.row().cells().find_cell(r1_col.id); + if (cell) { + result[p1][c1] = boost::any_cast(int32_type->deserialize(cell->as_atomic_cell().value())); } } return true; diff --git a/thrift/handler.cc b/thrift/handler.cc index 593ab5ecb0..974af2bcd5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -148,7 +148,7 @@ public: // FIXME: force limit count? while (beg != end && count--) { const column_definition& def = range.reversed ? *--end : *beg++; - atomic_cell_view cell = (*rw).at(def.id).as_atomic_cell(); + atomic_cell_view cell = (*rw).cell_at(def.id).as_atomic_cell(); if (def.is_atomic()) { if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels Column col;