From 431958c621ae630f76e6da1e84e99a7db127d52d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 11 May 2015 11:43:47 +0200 Subject: [PATCH 1/6] keys: Fix make_empty() make_empty() is used from thrift to create a clustering_key for a table's row without clustering key columns. The implementation was misleading because it seemed to be handling any number of components in the key while only no-component case is supposed to work. --- keys.hh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/keys.hh b/keys.hh index b815d8ffb0..38583ae1b5 100644 --- a/keys.hh +++ b/keys.hh @@ -118,9 +118,7 @@ protected: } public: static TopLevel make_empty(const schema& s) { - std::vector v; - v.resize(get_compound_type(s)->types().size()); - return from_exploded(s, v); + return from_exploded(s, {}); } static TopLevel from_exploded(const schema& s, const std::vector& v) { From a5ff3818f7a4ff911343a1c181876af8ab839d53 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 8 May 2015 14:03:12 +0200 Subject: [PATCH 2/6] mutation_partition: Visually segregate mutators from getters --- mutation_partition.hh | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/mutation_partition.hh b/mutation_partition.hh index 2729c9981c..181712f9af 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -200,7 +200,9 @@ public: ~mutation_partition(); mutation_partition& operator=(const mutation_partition& x); mutation_partition& operator=(mutation_partition&& x) = default; - tombstone partition_tombstone() const { return _tombstone; } + bool equal(const schema& s, const mutation_partition&) const; + friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); +public: void apply(tombstone t) { _tombstone.apply(t); } void apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t); void apply_delete(schema_ptr schema, clustering_key&& key, tombstone t); @@ -211,23 +213,23 @@ public: void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); void apply(schema_ptr schema, const mutation_partition& p); void apply(schema_ptr schema, mutation_partition_view); - row& static_row() { return _static_row; } - // return a set of rows_entry where each entry represents a CQL row sharing the same clustering key. - const rows_type& clustered_rows() const { return _rows; } - const row& static_row() const { return _static_row; } - const row_tombstones_type& row_tombstones() const { return _row_tombstones; } +public: deletable_row& clustered_row(const clustering_key& key); deletable_row& clustered_row(clustering_key&& key); deletable_row& clustered_row(const schema& s, const clustering_key_view& key); +public: + tombstone partition_tombstone() const { return _tombstone; } + row& static_row() { return _static_row; } + const row& static_row() const { return _static_row; } + // return a set of rows_entry where each entry represents a CQL row sharing the same clustering key. + const rows_type& clustered_rows() const { return _rows; } + const row_tombstones_type& row_tombstones() const { return _row_tombstones; } const row* find_row(const clustering_key& key) const; const rows_entry* find_entry(schema_ptr schema, const clustering_key_prefix& key) const; tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const; - friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); boost::iterator_range range(const schema& schema, const query::range& r) const; // Returns at most "limit" rows. The limit must be greater than 0. void query(const schema& s, const query::partition_slice& slice, uint32_t limit, query::result::partition_writer& pw) const; -public: - bool equal(const schema& s, const mutation_partition&) const; }; From 56bea440a79609f93682f07be8f6d7a57ed0ba55 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Fri, 8 May 2015 14:41:08 +0200 Subject: [PATCH 3/6] mutation_partition: Pass schema by const& where applicable If method doesn't want to share schema ownership it doesn't have to take it by shared pointer. The benefit is that it's slightly cheaper and those methods may now be called from places which don't own schema. --- cql3/statements/delete_statement.cc | 2 +- database.cc | 8 +++--- mutation_partition.cc | 38 ++++++++++++++-------------- mutation_partition.hh | 12 ++++----- tests/urchin/frozen_mutation_test.cc | 14 +++++----- 5 files changed, 37 insertions(+), 37 deletions(-) diff --git a/cql3/statements/delete_statement.cc b/cql3/statements/delete_statement.cc index 4ed0304a87..a0976ae6e2 100644 --- a/cql3/statements/delete_statement.cc +++ b/cql3/statements/delete_statement.cc @@ -30,7 +30,7 @@ namespace statements { void delete_statement::add_update_for_key(mutation& m, const exploded_clustering_prefix& prefix, const update_parameters& params) { if (_column_operations.empty()) { - m.partition().apply_delete(s, prefix, params.make_tombstone()); + m.partition().apply_delete(*s, prefix, params.make_tombstone()); return; } diff --git a/database.cc b/database.cc index 19b0fa14d4..d27c5bb29b 100644 --- a/database.cc +++ b/database.cc @@ -56,7 +56,7 @@ column_family::find_partition(const dht::decorated_key& key) const { for (auto&& mt : _memtables) { auto mp = mt.find_partition(key); if (mp) { - ret.apply(_schema, *mp); + ret.apply(*_schema, *mp); any = true; } } @@ -157,7 +157,7 @@ column_family::for_all_partitions(Func&& func) const { } if (current) { // FIXME: handle different schemas - current->second.apply(_schema, mp); + current->second.apply(*_schema, mp); } else { current = std::make_pair(key, mp); } @@ -555,13 +555,13 @@ database::find_or_create_keyspace(const sstring& name) { void memtable::apply(const mutation& m) { mutation_partition& p = find_or_create_partition(m.decorated_key()); - p.apply(_schema, m.partition()); + p.apply(*_schema, m.partition()); } void memtable::apply(const frozen_mutation& m) { mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); - p.apply(_schema, m.partition()); + p.apply(*_schema, m.partition()); } // Based on: diff --git a/mutation_partition.cc b/mutation_partition.cc index b4427e1e8c..02c5dacab8 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -28,14 +28,14 @@ mutation_partition::operator=(const mutation_partition& x) { } void -mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { +mutation_partition::apply(const schema& schema, const mutation_partition& p) { _tombstone.apply(p._tombstone); for (auto&& e : p._row_tombstones) { - apply_row_tombstone(*schema, e.prefix(), e.t()); + apply_row_tombstone(schema, e.prefix(), e.t()); } - auto merge_cells = [this, schema] (row& old_row, const row& new_row, auto&& find_column_def) { + static auto merge_cells = [] (row& old_row, const row& new_row, auto&& find_column_def) { for (auto&& new_column : new_row) { auto col = new_column.first; auto i = old_row.find(col); @@ -49,14 +49,14 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { } }; - auto find_static_column_def = [schema] (auto col) -> const column_definition& { return schema->static_column_at(col); }; - auto find_regular_column_def = [schema] (auto col) -> const column_definition& { return schema->regular_column_at(col); }; + auto find_static_column_def = [&schema] (auto col) -> const column_definition& { return schema.static_column_at(col); }; + auto find_regular_column_def = [&schema] (auto col) -> const column_definition& { return schema.regular_column_at(col); }; merge_cells(_static_row, p._static_row, find_static_column_def); for (auto&& entry : p._rows) { auto& key = entry.key(); - auto i = _rows.find(key, rows_entry::compare(*schema)); + auto i = _rows.find(key, rows_entry::compare(schema)); if (i == _rows.end()) { auto e = new rows_entry(entry); _rows.insert(i, *e); @@ -69,9 +69,9 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { } void -mutation_partition::apply(schema_ptr schema, mutation_partition_view p) { - mutation_partition_applier applier(*schema, *this); - p.accept(*schema, applier); +mutation_partition::apply(const schema& schema, mutation_partition_view p) { + mutation_partition_applier applier(schema, *this); + p.accept(schema, applier); } tombstone @@ -128,24 +128,24 @@ mutation_partition::apply_row_tombstone(const schema& schema, clustering_key_pre } void -mutation_partition::apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t) { +mutation_partition::apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t) { if (!prefix) { apply(t); - } else if (prefix.is_full(*schema)) { - apply_delete(schema, clustering_key::from_clustering_prefix(*schema, prefix), t); + } else if (prefix.is_full(schema)) { + apply_delete(schema, clustering_key::from_clustering_prefix(schema, prefix), t); } else { - apply_row_tombstone(*schema, clustering_key_prefix::from_clustering_prefix(*schema, prefix), t); + apply_row_tombstone(schema, clustering_key_prefix::from_clustering_prefix(schema, prefix), t); } } void -mutation_partition::apply_delete(schema_ptr schema, clustering_key&& key, tombstone t) { - clustered_row(*schema, std::move(key)).apply(t); +mutation_partition::apply_delete(const schema& schema, clustering_key&& key, tombstone t) { + clustered_row(schema, std::move(key)).apply(t); } void -mutation_partition::apply_delete(schema_ptr schema, clustering_key_view key, tombstone t) { - clustered_row(*schema, key).apply(t); +mutation_partition::apply_delete(const schema& schema, clustering_key_view key, tombstone t) { + clustered_row(schema, key).apply(t); } void @@ -154,8 +154,8 @@ mutation_partition::apply_insert(const schema& s, clustering_key_view key, api:: } const rows_entry* -mutation_partition::find_entry(schema_ptr schema, const clustering_key_prefix& key) const { - auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(*schema))); +mutation_partition::find_entry(const schema& schema, const clustering_key_prefix& key) const { + auto i = _rows.find(key, rows_entry::key_comparator(clustering_key::less_compare_with_prefix(schema))); if (i == _rows.end()) { return nullptr; } diff --git a/mutation_partition.hh b/mutation_partition.hh index 181712f9af..56410ad276 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -204,15 +204,15 @@ public: friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp); public: void apply(tombstone t) { _tombstone.apply(t); } - void apply_delete(schema_ptr schema, const exploded_clustering_prefix& prefix, tombstone t); - void apply_delete(schema_ptr schema, clustering_key&& key, tombstone t); - void apply_delete(schema_ptr schema, clustering_key_view key, tombstone t); + void apply_delete(const schema& schema, const exploded_clustering_prefix& prefix, tombstone t); + void apply_delete(const schema& schema, clustering_key&& key, tombstone t); + void apply_delete(const schema& schema, clustering_key_view key, tombstone t); // Equivalent to applying a mutation with an empty row, created with given timestamp void apply_insert(const schema& s, clustering_key_view, api::timestamp_type created_at); // prefix must not be full void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); - void apply(schema_ptr schema, const mutation_partition& p); - void apply(schema_ptr schema, mutation_partition_view); + void apply(const schema& schema, const mutation_partition& p); + void apply(const schema& schema, mutation_partition_view); public: deletable_row& clustered_row(const clustering_key& key); deletable_row& clustered_row(clustering_key&& key); @@ -225,7 +225,7 @@ public: const rows_type& clustered_rows() const { return _rows; } const row_tombstones_type& row_tombstones() const { return _row_tombstones; } const row* find_row(const clustering_key& key) const; - const rows_entry* find_entry(schema_ptr schema, const clustering_key_prefix& key) const; + const rows_entry* find_entry(const schema& schema, const clustering_key_prefix& key) const; tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const; tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const; diff --git a/tests/urchin/frozen_mutation_test.cc b/tests/urchin/frozen_mutation_test.cc index 3ff33159f7..ed2ce757f8 100644 --- a/tests/urchin/frozen_mutation_test.cc +++ b/tests/urchin/frozen_mutation_test.cc @@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(test_writing_and_reading) { test_freezing(m); - m.partition().apply_delete(s, ck2, new_tombstone()); + m.partition().apply_delete(*s, ck2, new_tombstone()); test_freezing(m); @@ -107,16 +107,16 @@ BOOST_AUTO_TEST_CASE(test_application_of_partition_view_has_the_same_effect_as_a m2.set_static_cell("static_1", bytes("val5"), new_timestamp()); mutation m_frozen(key, s); - m_frozen.partition().apply(s, freeze(m1).partition()); - m_frozen.partition().apply(s, freeze(m2).partition()); + m_frozen.partition().apply(*s, freeze(m1).partition()); + m_frozen.partition().apply(*s, freeze(m2).partition()); mutation m_unfrozen(key, s); - m_unfrozen.partition().apply(s, m1.partition()); - m_unfrozen.partition().apply(s, m2.partition()); + m_unfrozen.partition().apply(*s, m1.partition()); + m_unfrozen.partition().apply(*s, m2.partition()); mutation m_refrozen(key, s); - m_refrozen.partition().apply(s, freeze(m1).unfreeze(s).partition()); - m_refrozen.partition().apply(s, freeze(m2).unfreeze(s).partition()); + m_refrozen.partition().apply(*s, freeze(m1).unfreeze(s).partition()); + m_refrozen.partition().apply(*s, freeze(m2).unfreeze(s).partition()); assert_that(m_unfrozen).is_equal_to(m_refrozen); assert_that(m_unfrozen).is_equal_to(m_frozen); From dbc40dfb095df57f7edd84d2769940f2015ce748 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 11 May 2015 11:47:21 +0200 Subject: [PATCH 4/6] db: Encapsulate the "row" class Reduces coupling. User's should not rely on the fact that it's an std::map<>. It also allows us to extend row's interface with domain-specific methods, which are a lot easier to discover than free functions. --- database.cc | 15 --------- mutation.cc | 37 +++++++------------- mutation.hh | 1 - mutation_partition.cc | 63 ++++++++++++++++++++++++++--------- mutation_partition.hh | 43 ++++++++++++++++++++++-- mutation_partition_applier.hh | 18 +++------- partition_builder.hh | 8 ++--- tests/urchin/cql_test_env.cc | 14 ++++---- tests/urchin/mutation_test.cc | 36 ++++++++++---------- thrift/handler.cc | 2 +- 10 files changed, 135 insertions(+), 102 deletions(-) diff --git a/database.cc b/database.cc index d27c5bb29b..6b233af4c0 100644 --- a/database.cc +++ b/database.cc @@ -601,21 +601,6 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { return 0; } -void -merge_column(const column_definition& def, - atomic_cell_or_collection& old, - const atomic_cell_or_collection& neww) { - if (def.is_atomic()) { - if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) { - // FIXME: move()? - old = neww; - } - } else { - auto ct = static_pointer_cast(def.type); - old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); - } -} - future> column_family::query(const query::read_command& cmd) const { query::result::builder builder(cmd.slice); diff --git a/mutation.cc b/mutation.cc index 5273c638e5..b1686007eb 100644 --- a/mutation.cc +++ b/mutation.cc @@ -17,7 +17,7 @@ mutation::mutation(partition_key key_, schema_ptr schema) { } void mutation::set_static_cell(const column_definition& def, atomic_cell_or_collection value) { - update_column(_p.static_row(), def, std::move(value)); + _p.static_row().apply(def, std::move(value)); } void mutation::set_static_cell(const bytes& name, const boost::any& value, api::timestamp_type timestamp, ttl_opt ttl) { @@ -28,12 +28,12 @@ void mutation::set_static_cell(const bytes& name, const boost::any& value, api:: if (!column_def->is_static()) { throw std::runtime_error(sprint("column '%s' is not static", name)); } - update_column(_p.static_row(), *column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl)); + _p.static_row().apply(*column_def, atomic_cell::make_live(timestamp, column_def->type->decompose(value), ttl)); } void mutation::set_clustered_cell(const exploded_clustering_prefix& prefix, const column_definition& def, atomic_cell_or_collection value) { auto& row = _p.clustered_row(clustering_key::from_clustering_prefix(*_schema, prefix)).cells; - update_column(row, def, std::move(value)); + row.apply(def, std::move(value)); } void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, const boost::any& value, @@ -47,7 +47,7 @@ void mutation::set_clustered_cell(const clustering_key& key, const bytes& name, void mutation::set_clustered_cell(const clustering_key& key, const column_definition& def, atomic_cell_or_collection value) { auto& row = _p.clustered_row(key).cells; - update_column(row, def, std::move(value)); + row.apply(def, std::move(value)); } void mutation::set_cell(const exploded_clustering_prefix& prefix, const bytes& name, const boost::any& value, @@ -71,32 +71,19 @@ void mutation::set_cell(const exploded_clustering_prefix& prefix, const column_d std::experimental::optional mutation::get_cell(const clustering_key& rkey, const column_definition& def) const { - auto find_cell = [&def] (const row& r) { - auto i = r.find(def.id); - if (i == r.end()) { - return std::experimental::optional{}; - } - return std::experimental::optional{i->second}; - }; if (def.is_static()) { - return find_cell(_p.static_row()); + const atomic_cell_or_collection* cell = _p.static_row().find_cell(def.id); + if (!cell) { + return {}; + } + return { *cell }; } else { - auto r = _p.find_row(rkey); + const row* r = _p.find_row(rkey); if (!r) { return {}; } - return find_cell(*r); - } -} - -void mutation::update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value) { - // our mutations are not yet immutable - auto id = def.id; - auto i = row.lower_bound(id); - if (i == row.end() || i->first != id) { - row.emplace_hint(i, id, std::move(value)); - } else { - merge_column(def, i->second, value); + const atomic_cell_or_collection* cell = r->find_cell(def.id); + return { *cell }; } } diff --git a/mutation.hh b/mutation.hh index 7b844fac26..fa0a8d7654 100644 --- a/mutation.hh +++ b/mutation.hh @@ -39,6 +39,5 @@ public: bool operator==(const mutation&) const; bool operator!=(const mutation&) const; private: - static void update_column(row& row, const column_definition& def, atomic_cell_or_collection&& value); friend std::ostream& operator<<(std::ostream& os, const mutation& m); }; diff --git a/mutation_partition.cc b/mutation_partition.cc index 02c5dacab8..89d34f1999 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -37,15 +37,7 @@ mutation_partition::apply(const schema& schema, const mutation_partition& p) { static auto merge_cells = [] (row& old_row, const row& new_row, auto&& find_column_def) { for (auto&& new_column : new_row) { - auto col = new_column.first; - auto i = old_row.find(col); - if (i == old_row.end()) { - old_row.emplace_hint(i, new_column); - } else { - auto& old_column = *i; - auto& def = find_column_def(col); - merge_column(def, old_column.second, new_column.second); - } + old_row.apply(new_column.first, new_column.second, find_column_def); } }; @@ -228,22 +220,22 @@ template static void get_row_slice(const row& cells, const std::vector& columns, tombstone tomb, ColumnDefResolver&& id_to_def, query::result::row_writer& writer) { for (auto id : columns) { - auto i = cells.find(id); - if (i == cells.end()) { + const atomic_cell_or_collection* cell = cells.find_cell(id); + if (!cell) { writer.add_empty(); } else { auto&& def = id_to_def(id); if (def.is_atomic()) { - auto c = i->second.as_atomic_cell(); + auto c = cell->as_atomic_cell(); if (!c.is_live(tomb)) { writer.add_empty(); } else { - writer.add(i->second.as_atomic_cell()); + writer.add(cell->as_atomic_cell()); } } else { - auto&& cell = i->second.as_collection_mutation(); + auto&& mut = cell->as_collection_mutation(); auto&& ctype = static_pointer_cast(def.type); - auto m_view = ctype->deserialize_mutation_form(cell); + auto m_view = ctype->deserialize_mutation_form(mut); m_view.tomb.apply(tomb); auto m_ser = ctype->serialize_mutation_form_only_live(m_view); if (ctype->is_empty(m_ser)) { @@ -414,3 +406,44 @@ bool mutation_partition::equal(const schema& s, const mutation_partition& p) con return rows_equal(s, _static_row, p._static_row); } + +void +merge_column(const column_definition& def, + atomic_cell_or_collection& old, + const atomic_cell_or_collection& neww) { + if (def.is_atomic()) { + if (compare_atomic_cell_for_merge(old.as_atomic_cell(), neww.as_atomic_cell()) < 0) { + // FIXME: move()? + old = neww; + } + } else { + auto ct = static_pointer_cast(def.type); + old = ct->merge(old.as_collection_mutation(), neww.as_collection_mutation()); + } +} + +void +row::apply(const column_definition& column, atomic_cell_or_collection value) { + // our mutations are not yet immutable + auto id = column.id; + auto i = _cells.lower_bound(id); + if (i == _cells.end() || i->first != id) { + _cells.emplace_hint(i, id, std::move(value)); + } else { + merge_column(column, i->second, value); + } +} + +void +row::append_cell(column_id id, atomic_cell_or_collection value) { + _cells.emplace_hint(_cells.end(), id, std::move(value)); +} + +const atomic_cell_or_collection* +row::find_cell(column_id id) const { + auto i = _cells.find(id); + if (i == _cells.end()) { + return nullptr; + } + return &i->second; +} diff --git a/mutation_partition.hh b/mutation_partition.hh index 56410ad276..79cfa41357 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -15,8 +15,47 @@ #include "query-result-writer.hh" #include "mutation_partition_view.hh" -// FIXME: Encapsulate -using row = std::map; +// Container for cells of a row. Cells are identified by column_id. +// +// Can be used as a range of std::pair. +// +class row { + using map_type = std::map; + map_type _cells; +public: + using value_type = map_type::value_type; + using iterator = map_type::iterator; + using const_iterator = map_type::const_iterator; +public: + iterator begin() { return _cells.begin(); } + iterator end() { return _cells.end(); } + const_iterator begin() const { return _cells.begin(); } + const_iterator end() const { return _cells.end(); } + size_t size() const { return _cells.size(); } + + // Returns a reference to cell's value or throws std::out_of_range + const atomic_cell_or_collection& cell_at(column_id id) const { return _cells.at(id); } + + // Returns a pointer to cell's value or nullptr if column is not set. + const atomic_cell_or_collection* find_cell(column_id id) const; +public: + // Merges cell's value into the row. + void apply(const column_definition& column, atomic_cell_or_collection cell); + + // Adds cell to the row. The column must not be already set. + void append_cell(column_id id, atomic_cell_or_collection cell); + + // Merges given cell into the row. + template + void apply(column_id id, atomic_cell_or_collection cell, ColumnDefinitionResolver&& resolver) { + auto i = _cells.lower_bound(id); + if (i == _cells.end() || i->first != id) { + _cells.emplace_hint(i, id, std::move(cell)); + } else { + merge_column(resolver(id), i->second, std::move(cell)); + } + } +}; std::ostream& operator<<(std::ostream& os, const row::value_type& rv); std::ostream& operator<<(std::ostream& os, const row& r); diff --git a/mutation_partition_applier.hh b/mutation_partition_applier.hh index 2933020048..e12051a427 100644 --- a/mutation_partition_applier.hh +++ b/mutation_partition_applier.hh @@ -13,16 +13,6 @@ class mutation_partition_applier : public mutation_partition_visitor { const schema& _schema; mutation_partition& _p; deletable_row* _current_row; -private: - template - void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) { - auto i = r.lower_bound(id); - if (i == r.end() || i->first != id) { - r.emplace_hint(i, id, std::move(c)); - } else { - merge_column(resolver(id), i->second, std::move(c)); - } - } public: mutation_partition_applier(const schema& s, mutation_partition& target) : _schema(s), _p(target) { } @@ -32,12 +22,12 @@ public: } virtual void accept_static_cell(column_id id, atomic_cell_view cell) override { - apply_cell(_p._static_row, id, atomic_cell_or_collection(cell), + _p._static_row.apply(id, atomic_cell_or_collection(cell), [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); } virtual void accept_static_cell(column_id id, collection_mutation::view collection) override { - apply_cell(_p._static_row, id, atomic_cell_or_collection(collection), + _p._static_row.apply(id, atomic_cell_or_collection(collection), [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); } @@ -53,12 +43,12 @@ public: } virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { - apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell), + _current_row->cells.apply(id, atomic_cell_or_collection(cell), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { - apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection), + _current_row->cells.apply(id, atomic_cell_or_collection(collection), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } }; diff --git a/partition_builder.hh b/partition_builder.hh index acc51f7481..5bb6261c62 100644 --- a/partition_builder.hh +++ b/partition_builder.hh @@ -27,12 +27,12 @@ public: virtual void accept_static_cell(column_id id, atomic_cell_view cell) override { row& r = _partition.static_row(); - r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell)); + r.append_cell(id, atomic_cell_or_collection(cell)); } virtual void accept_static_cell(column_id id, collection_mutation::view collection) override { row& r = _partition.static_row(); - r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection)); + r.append_cell(id, atomic_cell_or_collection(collection)); } virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override { @@ -48,11 +48,11 @@ public: virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { row& r = _current_row->cells; - r.emplace_hint(r.end(), id, atomic_cell_or_collection(cell)); + r.append_cell(id, atomic_cell_or_collection(cell)); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { row& r = _current_row->cells; - r.emplace_hint(r.end(), id, atomic_cell_or_collection(collection)); + r.append_cell(id, atomic_cell_or_collection(collection)); } }; diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index e72b574508..fd31746342 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -136,19 +136,19 @@ public: assert(row != nullptr); auto col_def = schema->get_column_definition(utf8_type->decompose(column_name)); assert(col_def != nullptr); - auto i = row->find(col_def->id); - if (i == row->end()) { + const atomic_cell_or_collection* cell = row->find_cell(col_def->id); + if (!cell) { assert(((void)"column not set", 0)); } bytes actual; if (!col_def->type->is_multi_cell()) { - auto cell = i->second.as_atomic_cell(); - assert(cell.is_live()); - actual = { cell.value().begin(), cell.value().end() }; + auto c = cell->as_atomic_cell(); + assert(c.is_live()); + actual = { c.value().begin(), c.value().end() }; } else { - auto cell = i->second.as_collection_mutation(); + auto c = cell->as_collection_mutation(); auto type = dynamic_pointer_cast(col_def->type); - actual = type->to_value(type->deserialize_mutation_form(cell), + actual = type->to_value(type->deserialize_mutation_form(c), serialization_format::internal()); } assert(col_def->type->equal(actual, col_def->type->decompose(expected))); diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index bab9128100..78a29edfa5 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -33,9 +33,9 @@ BOOST_AUTO_TEST_CASE(test_mutation_is_applied) { cf.apply(std::move(m)); row& r = cf.find_or_create_row_slow(key, c_key); - auto i = r.find(r1_col.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_atomic_cell(); + auto i = r.find_cell(r1_col.id); + BOOST_REQUIRE(i); + auto cell = i->as_atomic_cell(); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(3))); } @@ -124,9 +124,9 @@ BOOST_AUTO_TEST_CASE(test_map_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_map_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests @@ -157,9 +157,9 @@ BOOST_AUTO_TEST_CASE(test_set_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_set_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 3); // FIXME: more strict tests @@ -191,9 +191,9 @@ BOOST_AUTO_TEST_CASE(test_list_mutations) { cf.apply(m2o); row& r = cf.find_or_create_partition_slow(key).static_row(); - auto i = r.find(column.id); - BOOST_REQUIRE(i != r.end()); - auto cell = i->second.as_collection_mutation(); + auto i = r.find_cell(column.id); + BOOST_REQUIRE(i); + auto cell = i->as_collection_mutation(); auto muts = my_list_type->deserialize_mutation_form(cell); BOOST_REQUIRE(muts.cells.size() == 4); // FIXME: more strict tests @@ -223,9 +223,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_one_partition) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key); BOOST_REQUIRE(r); - auto i = r->find(r1_col.id); - BOOST_REQUIRE(i != r->end()); - auto cell = i->second.as_atomic_cell(); + auto i = r->find_cell(r1_col.id); + BOOST_REQUIRE(i); + auto cell = i->as_atomic_cell(); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1))); }; @@ -267,9 +267,9 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto p1 = boost::any_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = boost::any_cast(int32_type->deserialize(re.key().explode(*s)[0])); - auto i = re.row().cells.find(r1_col.id); - if (i != re.row().cells.end()) { - result[p1][c1] = boost::any_cast(int32_type->deserialize(i->second.as_atomic_cell().value())); + auto cell = re.row().cells.find_cell(r1_col.id); + if (cell) { + result[p1][c1] = boost::any_cast(int32_type->deserialize(cell->as_atomic_cell().value())); } } return true; diff --git a/thrift/handler.cc b/thrift/handler.cc index 593ab5ecb0..974af2bcd5 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -148,7 +148,7 @@ public: // FIXME: force limit count? while (beg != end && count--) { const column_definition& def = range.reversed ? *--end : *beg++; - atomic_cell_view cell = (*rw).at(def.id).as_atomic_cell(); + atomic_cell_view cell = (*rw).cell_at(def.id).as_atomic_cell(); if (def.is_atomic()) { if (cell.is_live()) { // FIXME: we should actually use tombstone information from all levels Column col; From f656ae8ed47ef01cea87fc91a6b0e9ad2a078f50 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 11 May 2015 16:53:50 +0200 Subject: [PATCH 5/6] db: Encapsulate deletable_row fields --- cql3/statements/update_statement.cc | 2 +- database.cc | 2 +- mutation.cc | 4 ++-- mutation_partition.cc | 22 +++++++++++----------- mutation_partition.hh | 27 +++++++++++++++------------ mutation_partition_applier.hh | 4 ++-- mutation_partition_serializer.cc | 12 ++++++------ partition_builder.hh | 6 +++--- sstables/partition.cc | 2 +- sstables/sstables.cc | 8 ++++---- tests/urchin/mutation_test.cc | 2 +- 11 files changed, 47 insertions(+), 44 deletions(-) diff --git a/cql3/statements/update_statement.cc b/cql3/statements/update_statement.cc index a4ddd40a0f..84271786e5 100644 --- a/cql3/statements/update_statement.cc +++ b/cql3/statements/update_statement.cc @@ -59,7 +59,7 @@ void update_statement::add_update_for_key(mutation& m, const exploded_clustering } else { if (type == statement_type::INSERT && prefix) { auto& row = m.partition().clustered_row(clustering_key::from_clustering_prefix(*s, prefix)); - row.created_at = params.timestamp(); + row.apply(params.timestamp()); } } diff --git a/database.cc b/database.cc index 6b233af4c0..f8d373aa92 100644 --- a/database.cc +++ b/database.cc @@ -179,7 +179,7 @@ column_family::for_all_partitions_slow(std::functionrow().t.apply(entry.row().t); - i->row().created_at = std::max(i->row().created_at, entry.row().created_at); - merge_cells(i->row().cells, entry.row().cells, find_regular_column_def); + i->row().apply(entry.row().deleted_at()); + i->row().apply(entry.row().created_at()); + merge_cells(i->row().cells(), entry.row().cells(), find_regular_column_def); } } } @@ -94,7 +94,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key auto j = _rows.find(key, rows_entry::compare(schema)); if (j != _rows.end()) { - t.apply(j->row().t); + t.apply(j->row().deleted_at()); } return t; @@ -103,7 +103,7 @@ mutation_partition::tombstone_for_row(const schema& schema, const clustering_key tombstone mutation_partition::tombstone_for_row(const schema& schema, const rows_entry& e) const { tombstone t = range_tombstone_for_row(schema, e.key()); - t.apply(e.row().t); + t.apply(e.row().deleted_at()); return t; } @@ -160,7 +160,7 @@ mutation_partition::find_row(const clustering_key& key) const { if (i == _rows.end()) { return nullptr; } - return &i->row().cells; + return &i->row().cells(); } deletable_row& @@ -303,10 +303,10 @@ mutation_partition::query(const schema& s, // only one lookup for a full-tuple singular range though. for (const rows_entry& e : range(s, row_range)) { auto& row = e.row(); - auto&& cells = row.cells; + auto&& cells = row.cells(); auto row_tombstone = tombstone_for_row(s, e); - auto row_is_live = row.created_at > row_tombstone.timestamp; + auto row_is_live = row.created_at() > row_tombstone.timestamp; // row_is_live is true for rows created using 'insert' statement // which are not deleted yet. Such rows are considered as present @@ -340,7 +340,7 @@ operator<<(std::ostream& os, const row& r) { std::ostream& operator<<(std::ostream& os, const deletable_row& dr) { - return fprint(os, "{deletable_row: %s %s %s}", dr.created_at, dr.t, dr.cells); + return fprint(os, "{deletable_row: %s %s %s}", dr._created_at, dr._deleted_at, dr._cells); } std::ostream& @@ -370,10 +370,10 @@ rows_equal(const schema& s, const row& r1, const row& r2) { bool deletable_row::equal(const schema& s, const deletable_row& other) const { - if (t != other.t || created_at != other.created_at) { + if (_deleted_at != other._deleted_at || _created_at != other._created_at) { return false; } - return rows_equal(s, cells, other.cells); + return rows_equal(s, _cells, other._cells); } bool diff --git a/mutation_partition.hh b/mutation_partition.hh index 79cfa41357..651f123c69 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -60,22 +60,25 @@ public: std::ostream& operator<<(std::ostream& os, const row::value_type& rv); std::ostream& operator<<(std::ostream& os, const row& r); -// FIXME: Encapsulate -struct deletable_row final { - tombstone t; - api::timestamp_type created_at = api::missing_timestamp; - row cells; +class deletable_row final { + tombstone _deleted_at; + api::timestamp_type _created_at = api::missing_timestamp; + row _cells; +public: + deletable_row() {} - void apply(tombstone t_) { - t.apply(t_); + void apply(tombstone deleted_at) { + _deleted_at.apply(deleted_at); } - void apply(api::timestamp_type new_created_at) { - if (new_created_at > created_at) { - created_at = new_created_at; - } + void apply(api::timestamp_type created_at) { + _created_at = std::max(_created_at, created_at); } - +public: + tombstone deleted_at() const { return _deleted_at; } + api::timestamp_type created_at() const { return _created_at; } + const row& cells() const { return _cells; } + row& cells() { return _cells; } friend std::ostream& operator<<(std::ostream& os, const deletable_row& dr); bool equal(const schema& s, const deletable_row& other) const; }; diff --git a/mutation_partition_applier.hh b/mutation_partition_applier.hh index e12051a427..0a222a0853 100644 --- a/mutation_partition_applier.hh +++ b/mutation_partition_applier.hh @@ -43,12 +43,12 @@ public: } virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { - _current_row->cells.apply(id, atomic_cell_or_collection(cell), + _current_row->cells().apply(id, atomic_cell_or_collection(cell), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { - _current_row->cells.apply(id, atomic_cell_or_collection(collection), + _current_row->cells().apply(id, atomic_cell_or_collection(collection), [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); } }; diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc index 9ece33f182..2df70aacfe 100644 --- a/mutation_partition_serializer.cc +++ b/mutation_partition_serializer.cc @@ -53,9 +53,9 @@ mutation_partition_serializer::size(const schema& schema, const mutation_partiti for (const rows_entry& e : p.clustered_rows()) { size += clustering_key_view_serializer(e.key()).size(); size += sizeof(api::timestamp_type); // e.row().created_at - size += tombstone_serializer(e.row().t).size(); + size += tombstone_serializer(e.row().deleted_at()).size(); size += sizeof(count_type); // e.row().cells.size() - for (auto&& cell_entry : e.row().cells) { + for (auto&& cell_entry : e.row().cells()) { size += sizeof(column_id); const column_definition& def = schema.regular_column_at(cell_entry.first); if (def.is_atomic()) { @@ -96,10 +96,10 @@ mutation_partition_serializer::write(data_output& out, const schema& schema, con // rows for (const rows_entry& e : p.clustered_rows()) { clustering_key_view_serializer::write(out, e.key()); - out.write(e.row().created_at); - tombstone_serializer::write(out, e.row().t); - out.write(e.row().cells.size()); - for (auto&& cell_entry : e.row().cells) { + out.write(e.row().created_at()); + tombstone_serializer::write(out, e.row().deleted_at()); + out.write(e.row().cells().size()); + for (auto&& cell_entry : e.row().cells()) { out.write(cell_entry.first); const column_definition& def = schema.regular_column_at(cell_entry.first); if (def.is_atomic()) { diff --git a/partition_builder.hh b/partition_builder.hh index 5bb6261c62..1f6adece6d 100644 --- a/partition_builder.hh +++ b/partition_builder.hh @@ -41,18 +41,18 @@ public: virtual void accept_row(clustering_key_view key, api::timestamp_type created_at, tombstone deleted_at) override { deletable_row& r = _partition.clustered_row(_schema, key); - r.created_at = created_at; + r.apply(created_at); r.apply(deleted_at); _current_row = &r; } virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { - row& r = _current_row->cells; + row& r = _current_row->cells(); r.append_cell(id, atomic_cell_or_collection(cell)); } virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { - row& r = _current_row->cells; + row& r = _current_row->cells(); r.append_cell(id, atomic_cell_or_collection(collection)); } }; diff --git a/sstables/partition.cc b/sstables/partition.cc index 345c3b8056..3a33d36d75 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -184,7 +184,7 @@ public: if (col.cell.size() == 0) { auto clustering_key = clustering_key::from_clustering_prefix(*_schema, clustering_prefix); auto& dr = mut->partition().clustered_row(clustering_key); - dr.created_at = timestamp; + dr.apply(timestamp); return; } diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 5f3f6f0f50..31d54a04cf 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -933,14 +933,14 @@ static future<> write_cell(output_stream& out, atomic_cell_view cell) { static future<> write_row_marker(output_stream& out, const rows_entry& clustered_row, bytes& clustering_key) { // Missing created_at (api::missing_timestamp) means no row marker. - if (clustered_row.row().created_at == api::missing_timestamp) { + if (clustered_row.row().created_at() == api::missing_timestamp) { return make_ready_future<>(); } // Write row mark cell to the beginning of clustered row. return write_column_name(out, clustering_key, {}).then([&out, &clustered_row] { column_mask mask = column_mask::none; - uint64_t timestamp = clustered_row.row().created_at; + uint64_t timestamp = clustered_row.row().created_at(); uint32_t value_length = 0; return write(out, mask, timestamp, value_length); @@ -956,10 +956,10 @@ static future<> write_clustered_row(output_stream& out, schema_ptr schema, return write_row_marker(out, clustered_row, clustering_key).then( [&out, &clustered_row, schema, &clustering_key] { // FIXME: Before writing cells, range tombstone must be written if the row has any (deletable_row::t). - assert(!clustered_row.row().t); + assert(!clustered_row.row().deleted_at()); // Write all cells of a partition's row. - return do_for_each(clustered_row.row().cells, [&out, schema, &clustering_key] (auto& value) { + return do_for_each(clustered_row.row().cells(), [&out, schema, &clustering_key] (auto& value) { auto column_id = value.first; auto&& column_definition = schema->regular_column_at(column_id); // non atomic cell isn't supported yet. atomic cell maps to a single trift cell. diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 78a29edfa5..6ab6fdfd4e 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -267,7 +267,7 @@ BOOST_AUTO_TEST_CASE(test_multiple_memtables_multiple_partitions) { auto p1 = boost::any_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = boost::any_cast(int32_type->deserialize(re.key().explode(*s)[0])); - auto cell = re.row().cells.find_cell(r1_col.id); + auto cell = re.row().cells().find_cell(r1_col.id); if (cell) { result[p1][c1] = boost::any_cast(int32_type->deserialize(cell->as_atomic_cell().value())); } From 8fedebd16689cf492076d572f0929f809ccafb7d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 13 May 2015 08:55:08 +0200 Subject: [PATCH 6/6] db: Add clarifying description to query_command and partition_slice --- query-request.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/query-request.hh b/query-request.hh index 54151f7cc5..1811d22928 100644 --- a/query-request.hh +++ b/query-request.hh @@ -119,6 +119,8 @@ std::ostream& operator<<(std::ostream& out, const range& r) { using partition_range = range; using clustering_range = range; +// Specifies subset of rows, columns and cell attributes to be returned in a query. +// Can be accessed across cores. class partition_slice { public: enum class option { send_clustering_key, send_partition_key, send_timestamp_and_expiry }; @@ -142,6 +144,9 @@ public: friend std::ostream& operator<<(std::ostream& out, const partition_slice& ps); }; +// Full specification of a query to the database. +// Intended for passing across replicas. +// Can be accessed across cores. class read_command { public: utils::UUID cf_id;