diff --git a/database.cc b/database.cc index 5804cfa546..3b08dacd66 100644 --- a/database.cc +++ b/database.cc @@ -43,12 +43,17 @@ column_family::column_family(schema_ptr schema, config config) , _config(std::move(config)) , _memtables(make_lw_shared(memtable_list{})) , _sstables(make_lw_shared()) + , _cache(_schema, sstables_as_mutation_source(), global_cache_tracker()) { add_memtable(); } -// define in .cc, since sstable is forward-declared in .hh -column_family::column_family(column_family&& x) = default; +mutation_source +column_family::sstables_as_mutation_source() { + return [this] (const query::partition_range& r) { + return make_sstable_reader(r); + }; +} // define in .cc, since sstable is forward-declared in .hh column_family::~column_family() { @@ -249,7 +254,12 @@ column_family::make_reader(const query::partition_range& range) const { readers.emplace_back(mt->make_reader(range)); } - readers.emplace_back(make_sstable_reader(range)); + if (_config.enable_cache) { + readers.emplace_back(_cache.make_reader(range)); + } else { + readers.emplace_back(make_sstable_reader(range)); + } + return make_combined_reader(std::move(readers)); } @@ -413,6 +423,13 @@ void column_family::add_memtable() { _memtables->emplace_back(make_lw_shared(_schema)); } +future<> +column_family::update_cache(memtable& m) { + // TODO: add option to disable populating of the cache. + // TODO: move data into cache instead of copying + return _cache.update(m.make_reader()); +} + void column_family::seal_active_memtable(database* db) { auto old = _memtables->back(); @@ -445,6 +462,8 @@ column_family::seal_active_memtable(database* db) { // FIXME: write all components return newtab.write_components(*old).then([name, this, &newtab, old] { return newtab.load(); + }).then([this, old] { + return update_cache(*old); }).then_wrapped([name, this, &newtab, old, db] (future<> ret) { try { ret.get(); diff --git a/database.hh b/database.hh index fce4027752..258f3d231f 100644 --- a/database.hh +++ b/database.hh @@ -43,6 +43,7 @@ #include "memtable.hh" #include #include "mutation_reader.hh" +#include "row_cache.hh" class frozen_mutation; @@ -71,6 +72,7 @@ public: sstring datadir; bool enable_disk_writes = true; bool enable_disk_reads = true; + bool enable_cache = true; }; private: schema_ptr _schema; @@ -78,6 +80,7 @@ private: lw_shared_ptr _memtables; // generation -> sstable. Ordered by key so we can easily get the most recent. lw_shared_ptr _sstables; + mutable row_cache _cache; // Cache covers only sstables. unsigned _sstable_generation = 1; unsigned _mutation_count = 0; db::replay_position _highest_flushed_rp; @@ -85,12 +88,15 @@ private: void add_sstable(sstables::sstable&& sstable); void add_memtable(); memtable& active_memtable() { return *_memtables->back(); } + future<> update_cache(memtable&); struct merge_comparator; private: // Creates a mutation reader which covers sstables. // Caller needs to ensure that column_family remains live (FIXME: relax this). // The 'range' parameter must be live as long as the reader is used. mutation_reader make_sstable_reader(const query::partition_range& range) const; + + mutation_source sstables_as_mutation_source(); public: // Creates a mutation reader which covers all data sources for this column family. // Caller needs to ensure that column_family remains live (FIXME: relax this). @@ -106,7 +112,7 @@ public: using const_row_ptr = std::unique_ptr; public: column_family(schema_ptr schema, config cfg); - column_family(column_family&&); + column_family(column_family&&) = delete; // 'this' is being captured during construction ~column_family(); schema_ptr schema() const { return _schema; } future find_partition(const dht::decorated_key& key) const; diff --git a/mutation_reader.hh b/mutation_reader.hh index 1807c03ed7..a912e77e50 100644 --- a/mutation_reader.hh +++ b/mutation_reader.hh @@ -49,3 +49,8 @@ future<> consume(mutation_reader& reader, Consumer consumer) { }); }); } + +// mutation_source represents source of data in mutation form. The data source +// can be queried multiple times and in parallel. For each query it returns +// independent mutation_reader. +using mutation_source = std::function; diff --git a/row_cache.hh b/row_cache.hh index b16f23bec9..1bacd1d205 100644 --- a/row_cache.hh +++ b/row_cache.hh @@ -83,8 +83,6 @@ public: // Returns a reference to shard-wide cache_tracker. cache_tracker& global_cache_tracker(); -using mutation_source = std::function; - // // A data source which wraps another data source such that data obtained from the underlying data source // is cached in-memory in order to serve queries faster. diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index 61f487a349..50b672809c 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -212,7 +212,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { column_family::config cfg; cfg.enable_disk_reads = false; cfg.enable_disk_writes = false; - column_family cf(s, cfg); + auto cf = make_lw_shared(s, cfg); const column_definition& r1_col = *s->get_column_definition("r1"); auto key = partition_key::from_exploded(*s, {to_bytes("key1")}); @@ -221,8 +221,8 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); mutation m(key, s); m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(r1))); - cf.apply(std::move(m)); - cf.seal_active_memtable(); + cf->apply(std::move(m)); + cf->seal_active_memtable(); }; insert_row(1001, 2001); insert_row(1002, 2002); @@ -230,7 +230,7 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { auto verify_row = [&] (int32_t c1, int32_t r1) { auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); - return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { + return cf->find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); BOOST_REQUIRE(i); @@ -252,7 +252,8 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { column_family::config cfg; cfg.enable_disk_reads = false; cfg.enable_disk_writes = false; - return do_with(column_family(s, cfg), [s] (column_family& cf) { + return do_with(make_lw_shared(s, cfg), [s] (auto& cf_ptr) mutable { + column_family& cf = *cf_ptr; std::map> shadow, result; const column_definition& r1_col = *s->get_column_definition("r1");