From f7abbda156e3f60091c110b9dc9cbb6e70a32600 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 7 May 2015 21:11:56 +0200 Subject: [PATCH] db: Apply frozen_mutation directly We don't convert it back to mutation before applying. mutation_partition has now apply() which works on mutation_partition_view. --- database.cc | 23 +++++++++---- database.hh | 8 +++++ mutation_partition.cc | 7 ++++ mutation_partition.hh | 3 ++ mutation_partition_applier.hh | 64 +++++++++++++++++++++++++++++++++++ 5 files changed, 99 insertions(+), 6 deletions(-) create mode 100644 mutation_partition_applier.hh diff --git a/database.cc b/database.cc index 96847fdd01..30d03167bd 100644 --- a/database.cc +++ b/database.cc @@ -23,6 +23,7 @@ #include #include #include "frozen_mutation.hh" +#include "mutation_partition_applier.hh" thread_local logging::logger dblog("database"); @@ -91,6 +92,16 @@ column_family::find_or_create_partition_slow(const partition_key& key) { return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key)); } +mutation_partition& +memtable::find_or_create_partition_slow(partition_key_view key) { + // FIXME: Perform lookup using std::pair + // to avoid unconditional copy of the partition key. + // We can't do it right now because std::map<> which holds + // partitions doesn't support heterogenous lookup. + // We could switch to boost::intrusive_map<> similar to what we have for row keys. + return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key)); +} + mutation_partition& memtable::find_or_create_partition(const dht::decorated_key& key) { // call lower_bound so we have a hint for the insert, just in case. @@ -547,6 +558,12 @@ memtable::apply(const mutation& m) { p.apply(_schema, m.partition()); } +void +memtable::apply(const frozen_mutation& m) { + mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); + p.apply(_schema, m.partition()); +} + // Based on: // - org.apache.cassandra.db.AbstractCell#reconcile() // - org.apache.cassandra.db.BufferExpiringCell#reconcile() @@ -752,9 +769,3 @@ future<> database::stop() { return make_ready_future<>(); } - -void -column_family::apply(const frozen_mutation& m) { - // FIXME: Optimize - active_memtable().apply(m.unfreeze(_schema)); -} diff --git a/database.hh b/database.hh index 4757c8d297..b86ae50786 100644 --- a/database.hh +++ b/database.hh @@ -67,8 +67,10 @@ public: explicit memtable(schema_ptr schema); schema_ptr schema() const { return _schema; } mutation_partition& find_or_create_partition(const dht::decorated_key& key); + mutation_partition& find_or_create_partition_slow(partition_key_view key); const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const; void apply(const mutation& m); + void apply(const frozen_mutation& m); const partitions_type& all_partitions() const; }; @@ -233,4 +235,10 @@ column_family::apply(const mutation& m) { return active_memtable().apply(m); } +inline +void +column_family::apply(const frozen_mutation& m) { + return active_memtable().apply(m); +} + #endif /* DATABASE_HH_ */ diff --git a/mutation_partition.cc b/mutation_partition.cc index 60dde852fb..b4427e1e8c 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -3,6 +3,7 @@ */ #include "mutation_partition.hh" +#include "mutation_partition_applier.hh" mutation_partition::mutation_partition(const mutation_partition& x) : _tombstone(x._tombstone) @@ -67,6 +68,12 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) { } } +void +mutation_partition::apply(schema_ptr schema, mutation_partition_view p) { + mutation_partition_applier applier(*schema, *this); + p.accept(*schema, applier); +} + tombstone mutation_partition::range_tombstone_for_row(const schema& schema, const clustering_key& key) const { tombstone t = _tombstone; diff --git a/mutation_partition.hh b/mutation_partition.hh index 5948721a08..2729c9981c 100644 --- a/mutation_partition.hh +++ b/mutation_partition.hh @@ -13,6 +13,7 @@ #include "keys.hh" #include "atomic_cell.hh" #include "query-result-writer.hh" +#include "mutation_partition_view.hh" // FIXME: Encapsulate using row = std::map; @@ -188,6 +189,7 @@ private: template friend class db::serializer; + friend class mutation_partition_applier; public: mutation_partition(schema_ptr s) : _rows(rows_entry::compare(*s)) @@ -208,6 +210,7 @@ public: // prefix must not be full void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t); void apply(schema_ptr schema, const mutation_partition& p); + void apply(schema_ptr schema, mutation_partition_view); row& static_row() { return _static_row; } // return a set of rows_entry where each entry represents a CQL row sharing the same clustering key. const rows_type& clustered_rows() const { return _rows; } diff --git a/mutation_partition_applier.hh b/mutation_partition_applier.hh new file mode 100644 index 0000000000..2933020048 --- /dev/null +++ b/mutation_partition_applier.hh @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include "mutation_partition_view.hh" +#include "schema.hh" + +// Mutation partition visitor which applies visited data into +// existing mutation_partition. +class mutation_partition_applier : public mutation_partition_visitor { + const schema& _schema; + mutation_partition& _p; + deletable_row* _current_row; +private: + template + void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) { + auto i = r.lower_bound(id); + if (i == r.end() || i->first != id) { + r.emplace_hint(i, id, std::move(c)); + } else { + merge_column(resolver(id), i->second, std::move(c)); + } + } +public: + mutation_partition_applier(const schema& s, mutation_partition& target) + : _schema(s), _p(target) { } + + virtual void accept_partition_tombstone(tombstone t) override { + _p.apply(t); + } + + virtual void accept_static_cell(column_id id, atomic_cell_view cell) override { + apply_cell(_p._static_row, id, atomic_cell_or_collection(cell), + [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); + } + + virtual void accept_static_cell(column_id id, collection_mutation::view collection) override { + apply_cell(_p._static_row, id, atomic_cell_or_collection(collection), + [this](column_id id) -> const column_definition& { return _schema.static_column_at(id); }); + } + + virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override { + _p.apply_row_tombstone(_schema, prefix, t); + } + + virtual void accept_row(clustering_key_view key, api::timestamp_type created_at, tombstone deleted_at) override { + deletable_row& r = _p.clustered_row(_schema, key); + r.apply(created_at); + r.apply(deleted_at); + _current_row = &r; + } + + virtual void accept_row_cell(column_id id, atomic_cell_view cell) override { + apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell), + [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); + } + + virtual void accept_row_cell(column_id id, collection_mutation::view collection) override { + apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection), + [this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); }); + } +};