db: Apply frozen_mutation directly

We don't convert it back to mutation before applying.

mutation_partition has now apply() which works on
mutation_partition_view.
This commit is contained in:
Tomasz Grabiec
2015-05-07 21:11:56 +02:00
parent bdcd11efe9
commit f7abbda156
5 changed files with 99 additions and 6 deletions

View File

@@ -23,6 +23,7 @@
#include <boost/algorithm/cxx11/all_of.hpp>
#include <boost/function_output_iterator.hpp>
#include "frozen_mutation.hh"
#include "mutation_partition_applier.hh"
thread_local logging::logger dblog("database");
@@ -91,6 +92,16 @@ column_family::find_or_create_partition_slow(const partition_key& key) {
return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key));
}
mutation_partition&
memtable::find_or_create_partition_slow(partition_key_view key) {
// FIXME: Perform lookup using std::pair<token, partition_key_view>
// to avoid unconditional copy of the partition key.
// We can't do it right now because std::map<> which holds
// partitions doesn't support heterogenous lookup.
// We could switch to boost::intrusive_map<> similar to what we have for row keys.
return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key));
}
mutation_partition&
memtable::find_or_create_partition(const dht::decorated_key& key) {
// call lower_bound so we have a hint for the insert, just in case.
@@ -547,6 +558,12 @@ memtable::apply(const mutation& m) {
p.apply(_schema, m.partition());
}
void
memtable::apply(const frozen_mutation& m) {
mutation_partition& p = find_or_create_partition_slow(m.key(*_schema));
p.apply(_schema, m.partition());
}
// Based on:
// - org.apache.cassandra.db.AbstractCell#reconcile()
// - org.apache.cassandra.db.BufferExpiringCell#reconcile()
@@ -752,9 +769,3 @@ future<>
database::stop() {
return make_ready_future<>();
}
void
column_family::apply(const frozen_mutation& m) {
// FIXME: Optimize
active_memtable().apply(m.unfreeze(_schema));
}

View File

@@ -67,8 +67,10 @@ public:
explicit memtable(schema_ptr schema);
schema_ptr schema() const { return _schema; }
mutation_partition& find_or_create_partition(const dht::decorated_key& key);
mutation_partition& find_or_create_partition_slow(partition_key_view key);
const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const;
void apply(const mutation& m);
void apply(const frozen_mutation& m);
const partitions_type& all_partitions() const;
};
@@ -233,4 +235,10 @@ column_family::apply(const mutation& m) {
return active_memtable().apply(m);
}
inline
void
column_family::apply(const frozen_mutation& m) {
return active_memtable().apply(m);
}
#endif /* DATABASE_HH_ */

View File

@@ -3,6 +3,7 @@
*/
#include "mutation_partition.hh"
#include "mutation_partition_applier.hh"
mutation_partition::mutation_partition(const mutation_partition& x)
: _tombstone(x._tombstone)
@@ -67,6 +68,12 @@ mutation_partition::apply(schema_ptr schema, const mutation_partition& p) {
}
}
void
mutation_partition::apply(schema_ptr schema, mutation_partition_view p) {
mutation_partition_applier applier(*schema, *this);
p.accept(*schema, applier);
}
tombstone
mutation_partition::range_tombstone_for_row(const schema& schema, const clustering_key& key) const {
tombstone t = _tombstone;

View File

@@ -13,6 +13,7 @@
#include "keys.hh"
#include "atomic_cell.hh"
#include "query-result-writer.hh"
#include "mutation_partition_view.hh"
// FIXME: Encapsulate
using row = std::map<column_id, atomic_cell_or_collection>;
@@ -188,6 +189,7 @@ private:
template<typename T>
friend class db::serializer;
friend class mutation_partition_applier;
public:
mutation_partition(schema_ptr s)
: _rows(rows_entry::compare(*s))
@@ -208,6 +210,7 @@ public:
// prefix must not be full
void apply_row_tombstone(const schema& schema, clustering_key_prefix prefix, tombstone t);
void apply(schema_ptr schema, const mutation_partition& p);
void apply(schema_ptr schema, mutation_partition_view);
row& static_row() { return _static_row; }
// return a set of rows_entry where each entry represents a CQL row sharing the same clustering key.
const rows_type& clustered_rows() const { return _rows; }

View File

@@ -0,0 +1,64 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "mutation_partition_view.hh"
#include "schema.hh"
// Mutation partition visitor which applies visited data into
// existing mutation_partition.
class mutation_partition_applier : public mutation_partition_visitor {
const schema& _schema;
mutation_partition& _p;
deletable_row* _current_row;
private:
template <typename ColumnIdResolver>
void apply_cell(row& r, column_id id, atomic_cell_or_collection c, ColumnIdResolver&& resolver) {
auto i = r.lower_bound(id);
if (i == r.end() || i->first != id) {
r.emplace_hint(i, id, std::move(c));
} else {
merge_column(resolver(id), i->second, std::move(c));
}
}
public:
mutation_partition_applier(const schema& s, mutation_partition& target)
: _schema(s), _p(target) { }
virtual void accept_partition_tombstone(tombstone t) override {
_p.apply(t);
}
virtual void accept_static_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}
virtual void accept_static_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_p._static_row, id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.static_column_at(id); });
}
virtual void accept_row_tombstone(clustering_key_prefix_view prefix, tombstone t) override {
_p.apply_row_tombstone(_schema, prefix, t);
}
virtual void accept_row(clustering_key_view key, api::timestamp_type created_at, tombstone deleted_at) override {
deletable_row& r = _p.clustered_row(_schema, key);
r.apply(created_at);
r.apply(deleted_at);
_current_row = &r;
}
virtual void accept_row_cell(column_id id, atomic_cell_view cell) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(cell),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}
virtual void accept_row_cell(column_id id, collection_mutation::view collection) override {
apply_cell(_current_row->cells, id, atomic_cell_or_collection(collection),
[this](column_id id) -> const column_definition& { return _schema.regular_column_at(id); });
}
};