From 8a18d2b699490b9abf471e324164a2eff9b2edbd Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 2 Jul 2015 14:10:41 +0200 Subject: [PATCH] Extract memtable implementation to memtable.cc --- configure.py | 1 + database.cc | 103 ------------------------------------------------ memtable.cc | 108 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 103 deletions(-) create mode 100644 memtable.cc diff --git a/configure.py b/configure.py index 79bc3f84fb..dffc9b2d70 100755 --- a/configure.py +++ b/configure.py @@ -399,6 +399,7 @@ urchin_core = (['database.cc', 'mutation.cc', 'row_cache.cc', 'frozen_mutation.cc', + 'memtable.cc', 'mutation_partition.cc', 'mutation_partition_view.cc', 'mutation_partition_serializer.cc', diff --git a/database.cc b/database.cc index ca1376a9f3..41d1a2a1f6 100644 --- a/database.cc +++ b/database.cc @@ -34,11 +34,6 @@ thread_local logging::logger dblog("database"); -memtable::memtable(schema_ptr schema) - : _schema(std::move(schema)) - , partitions(dht::decorated_key::less_comparator(_schema)) { -} - column_family::column_family(schema_ptr schema, config config) : _schema(std::move(schema)) , _config(std::move(config)) @@ -60,13 +55,6 @@ column_family::sstables_as_mutation_source() { column_family::~column_family() { } -memtable::const_mutation_partition_ptr -memtable::find_partition(const dht::decorated_key& key) const { - auto i = partitions.find(key); - // FIXME: remove copy if only one data source - return i == partitions.end() ? const_mutation_partition_ptr() : std::make_unique(i->second); -} - static bool belongs_to_current_shard(const mutation& m) { return dht::shard_of(m.token()) == engine().cpu_id(); @@ -186,31 +174,6 @@ column_family::find_row(const dht::decorated_key& partition_key, clustering_key }); } -mutation_partition& -memtable::find_or_create_partition_slow(partition_key_view key) { - // FIXME: Perform lookup using std::pair - // to avoid unconditional copy of the partition key. - // We can't do it right now because std::map<> which holds - // partitions doesn't support heterogenous lookup. - // We could switch to boost::intrusive_map<> similar to what we have for row keys. - return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key)); -} - -mutation_partition& -memtable::find_or_create_partition(const dht::decorated_key& key) { - // call lower_bound so we have a hint for the insert, just in case. - auto i = partitions.lower_bound(key); - if (i == partitions.end() || !key.equal(*_schema, i->first)) { - i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); - } - return i->second; -} - -const memtable::partitions_type& -memtable::all_partitions() const { - return partitions; -} - struct column_family::merge_comparator { schema_ptr _schema; using ptr = boost::iterator_range*; @@ -220,44 +183,6 @@ struct column_family::merge_comparator { } }; -boost::iterator_range -memtable::slice(const query::partition_range& range) const { - if (range.is_singular()) { - const query::ring_position& pos = range.start_value(); - - if (!pos.has_key()) { - fail(unimplemented::cause::RANGE_QUERIES); - } - - auto i = partitions.find(pos.as_decorated_key()); - if (i != partitions.end()) { - return boost::make_iterator_range(i, std::next(i)); - } else { - return boost::make_iterator_range(i, i); - } - } else { - if (!range.is_full()) { - fail(unimplemented::cause::RANGE_QUERIES); - } - - return boost::make_iterator_range(partitions.begin(), partitions.end()); - } -} - -mutation_reader -memtable::make_reader(const query::partition_range& range) const { - auto r = slice(range); - return [begin = r.begin(), end = r.end(), self = shared_from_this()] () mutable { - if (begin != end) { - auto m = mutation(self->_schema, begin->first, begin->second); - ++begin; - return make_ready_future(std::experimental::make_optional(std::move(m))); - } else { - return make_ready_future(); - } - }; -} - mutation_reader column_family::make_reader(const query::partition_range& range) const { std::vector readers; @@ -315,13 +240,6 @@ column_family::for_all_partitions_slow(std::function (directory_entry de)> _walker; @@ -936,27 +854,6 @@ database::existing_index_names(const sstring& cf_to_exclude) const { return names; } -void -memtable::update(const db::replay_position& rp) { - if (_replay_position < rp) { - _replay_position = rp; - } -} - -void -memtable::apply(const mutation& m, const db::replay_position& rp) { - mutation_partition& p = find_or_create_partition(m.decorated_key()); - p.apply(*_schema, m.partition()); - update(rp); -} - -void -memtable::apply(const frozen_mutation& m, const db::replay_position& rp) { - mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); - p.apply(*_schema, m.partition()); - update(rp); -} - // Based on: // - org.apache.cassandra.db.AbstractCell#reconcile() // - org.apache.cassandra.db.BufferExpiringCell#reconcile() diff --git a/memtable.cc b/memtable.cc new file mode 100644 index 0000000000..de66f51aac --- /dev/null +++ b/memtable.cc @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include "memtable.hh" +#include "frozen_mutation.hh" + +memtable::memtable(schema_ptr schema) + : _schema(std::move(schema)) + , partitions(dht::decorated_key::less_comparator(_schema)) { +} + +memtable::const_mutation_partition_ptr +memtable::find_partition(const dht::decorated_key& key) const { + auto i = partitions.find(key); + // FIXME: remove copy if only one data source + return i == partitions.end() ? const_mutation_partition_ptr() : std::make_unique(i->second); +} + +mutation_partition& +memtable::find_or_create_partition_slow(partition_key_view key) { + // FIXME: Perform lookup using std::pair + // to avoid unconditional copy of the partition key. + // We can't do it right now because std::map<> which holds + // partitions doesn't support heterogenous lookup. + // We could switch to boost::intrusive_map<> similar to what we have for row keys. + return find_or_create_partition(dht::global_partitioner().decorate_key(*_schema, key)); +} + +mutation_partition& +memtable::find_or_create_partition(const dht::decorated_key& key) { + // call lower_bound so we have a hint for the insert, just in case. + auto i = partitions.lower_bound(key); + if (i == partitions.end() || !key.equal(*_schema, i->first)) { + i = partitions.emplace_hint(i, std::make_pair(std::move(key), mutation_partition(_schema))); + } + return i->second; +} + +const memtable::partitions_type& +memtable::all_partitions() const { + return partitions; +} + +boost::iterator_range +memtable::slice(const query::partition_range& range) const { + if (range.is_singular()) { + const query::ring_position& pos = range.start_value(); + + if (!pos.has_key()) { + fail(unimplemented::cause::RANGE_QUERIES); + } + + auto i = partitions.find(pos.as_decorated_key()); + if (i != partitions.end()) { + return boost::make_iterator_range(i, std::next(i)); + } else { + return boost::make_iterator_range(i, i); + } + } else { + if (!range.is_full()) { + fail(unimplemented::cause::RANGE_QUERIES); + } + + return boost::make_iterator_range(partitions.begin(), partitions.end()); + } +} + +mutation_reader +memtable::make_reader(const query::partition_range& range) const { + auto r = slice(range); + return [begin = r.begin(), end = r.end(), self = shared_from_this()] () mutable { + if (begin != end) { + auto m = mutation(self->_schema, begin->first, begin->second); + ++begin; + return make_ready_future(std::experimental::make_optional(std::move(m))); + } else { + return make_ready_future(); + } + }; +} + +row& +memtable::find_or_create_row_slow(const partition_key& partition_key, const clustering_key& clustering_key) { + mutation_partition& p = find_or_create_partition_slow(partition_key); + return p.clustered_row(clustering_key).cells(); +} + +void +memtable::update(const db::replay_position& rp) { + if (_replay_position < rp) { + _replay_position = rp; + } +} + +void +memtable::apply(const mutation& m, const db::replay_position& rp) { + mutation_partition& p = find_or_create_partition(m.decorated_key()); + p.apply(*_schema, m.partition()); + update(rp); +} + +void +memtable::apply(const frozen_mutation& m, const db::replay_position& rp) { + mutation_partition& p = find_or_create_partition_slow(m.key(*_schema)); + p.apply(*_schema, m.partition()); + update(rp); +}