From 05f7c6abd59238cb855794ad4da68e8085ec0478 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 18 May 2015 11:20:21 +0300 Subject: [PATCH 1/5] sstable: rename convert_row() to read_row() More reader-friendly. --- sstables/partition.cc | 2 +- sstables/sstables.hh | 2 +- tests/urchin/sstable_mutation_test.cc | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sstables/partition.cc b/sstables/partition.cc index 44f39ec193..a9b75741b4 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -345,7 +345,7 @@ public: }; future -sstables::sstable::convert_row(schema_ptr schema, const sstables::key& key) { +sstables::sstable::read_row(schema_ptr schema, const sstables::key& key) { assert(schema); diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 6a242cd267..5fbf18e635 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -91,7 +91,7 @@ public: _generation = generation; } - future convert_row(schema_ptr schema, const key& k); + future read_row(schema_ptr schema, const key& k); private: static std::unordered_map> _version_string; static std::unordered_map> _format_string; diff --git a/tests/urchin/sstable_mutation_test.cc b/tests/urchin/sstable_mutation_test.cc index 8880998499..cb529004f6 100644 --- a/tests/urchin/sstable_mutation_test.cc +++ b/tests/urchin/sstable_mutation_test.cc @@ -18,7 +18,7 @@ SEASTAR_TEST_CASE(nonexistent_key) { return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([] (auto sstp) { return do_with(key::from_bytes(to_bytes("invalid_key")), [sstp] (auto& key) { auto s = uncompressed_schema(); - return sstp->convert_row(s, key).then([sstp, s, &key] (auto mutation) { + return sstp->read_row(s, key).then([sstp, s, &key] (auto mutation) { BOOST_REQUIRE(!mutation); return make_ready_future<>(); }); @@ -114,7 +114,7 @@ future<> test_no_clustered(bytes&& key, std::unordered_map && return reusable_sst("tests/urchin/sstables/uncompressed", 1).then([k = std::move(key), map = std::move(map)] (auto sstp) mutable { return do_with(sstables::key(std::move(k)), [sstp, map = std::move(map)] (auto& key) { auto s = uncompressed_schema(); - return sstp->convert_row(s, key).then([sstp, s, &key, map = std::move(map)] (auto mutation) { + return sstp->read_row(s, key).then([sstp, s, &key, map = std::move(map)] (auto mutation) { BOOST_REQUIRE(mutation); auto& mp = mutation->partition(); for (auto&& e : mp.range(*s, query::range())) { @@ -180,7 +180,7 @@ future generate_clustered(bytes&& key) { return reusable_sst("tests/urchin/sstables/complex", Generation).then([k = std::move(key)] (auto sstp) mutable { return do_with(sstables::key(std::move(k)), [sstp] (auto& key) { auto s = complex_schema(); - return sstp->convert_row(s, key).then([sstp, s, &key] (auto mutation) { + return sstp->read_row(s, key).then([sstp, s, &key] (auto mutation) { BOOST_REQUIRE(mutation); return std::move(*mutation); }); From d6754823b8dc6dd647cbdb494d364d1fa6586a39 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 18 May 2015 11:35:53 +0300 Subject: [PATCH 2/5] db: conform to sstable naming convention wrt generation --- database.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/database.cc b/database.cc index cbdb366951..bb94142514 100644 --- a/database.cc +++ b/database.cc @@ -284,11 +284,13 @@ void column_family::seal_active_memtable() { auto& old = _memtables.back(); _memtables.emplace_back(_schema); - sstring name = sprint("%s/%s-%s-%d.%d-Data.db", + // FIXME: better way of ensuring we don't attemt to + // overwrite an existing table. + auto gen = _sstable_generation++ * smp::count + engine().cpu_id(); + sstring name = sprint("%s/%s-%s-%d-Data.db", _config.datadir, _schema->ks_name(), _schema->cf_name(), - engine().cpu_id(), - _sstable_generation++); + gen); if (!_config.enable_disk_writes) { return; } From 72d721ea567249e3c0f30b7e1eaeab71ee262550 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 18 May 2015 18:35:03 +0300 Subject: [PATCH 3/5] db: futurize column_family::query() outer loop In preparation for reading from sstables, allow the outer loop of column_family::query() (iterating over partition ranges) do defer. --- database.cc | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/database.cc b/database.cc index bb94142514..eb15740d5a 100644 --- a/database.cc +++ b/database.cc @@ -25,6 +25,7 @@ #include #include "frozen_mutation.hh" #include "mutation_partition_applier.hh" +#include "core/do_with.hh" thread_local logging::logger dblog("database"); @@ -647,20 +648,35 @@ compare_atomic_cell_for_merge(atomic_cell_view left, atomic_cell_view right) { return 0; } +struct query_state { + explicit query_state(const query::read_command& cmd) + : cmd(cmd) + , builder(cmd.slice) + , limit(cmd.row_limit) + , current_partition_range(cmd.partition_ranges.begin()) { + } + const query::read_command& cmd; + query::result::builder builder; + uint32_t limit; + std::vector::const_iterator current_partition_range; + bool done() const { + return !limit || current_partition_range == cmd.partition_ranges.end(); + } +}; + future> column_family::query(const query::read_command& cmd) const { - query::result::builder builder(cmd.slice); - - uint32_t limit = cmd.row_limit; - for (auto&& range : cmd.partition_ranges) { - if (limit == 0) { - break; - } + return do_with(query_state(cmd), [this] (query_state& qs) { + return do_until(std::bind(&query_state::done, &qs), [this, &qs] { + auto& cmd = qs.cmd; + auto& builder = qs.builder; + auto& limit = qs.limit; + auto&& range = *qs.current_partition_range++; if (range.is_singular()) { auto& key = range.start_value(); auto partition = find_partition_slow(key); if (!partition) { - continue; + return make_ready_future<>(); } auto p_builder = builder.add_partition(key); partition->query(*_schema, cmd.slice, limit, p_builder); @@ -680,9 +696,12 @@ column_family::query(const query::read_command& cmd) const { } else { fail(unimplemented::cause::RANGE_QUERIES); } - } - return make_ready_future>( - make_lw_shared(builder.build())); + return make_ready_future<>(); + }).then([&qs] { + return make_ready_future>( + make_lw_shared(qs.builder.build())); + }); + }); } future> From 738be63b28fe1e1c1001729fc04be1dc2971ae08 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Tue, 19 May 2015 14:59:13 +0300 Subject: [PATCH 4/5] db: define column_family move constructor in .cc Allows using it from files that do not include sstable.hh. --- database.cc | 3 +++ database.hh | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/database.cc b/database.cc index eb15740d5a..80254c3227 100644 --- a/database.cc +++ b/database.cc @@ -40,6 +40,9 @@ column_family::column_family(schema_ptr schema, config config) , _memtables({memtable(_schema)}) { } +// define in .cc, since sstable is forward-declared in .hh +column_family::column_family(column_family&& x) = default; + // define in .cc, since sstable is forward-declared in .hh column_family::~column_family() { } diff --git a/database.hh b/database.hh index fbba4fbaec..0aa0d9b2b6 100644 --- a/database.hh +++ b/database.hh @@ -85,7 +85,7 @@ public: using const_row_ptr = std::unique_ptr; public: column_family(schema_ptr schema, config cfg); - column_family(column_family&&) = default; + column_family(column_family&&); ~column_family(); schema_ptr schema() const { return _schema; } const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const; From db04bba20862550bdb538bfca16c45dec8f18e65 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 18 May 2015 19:45:39 +0300 Subject: [PATCH 5/5] db: futurize the single partition query path Prepare for disk reads. --- database.cc | 110 ++++++++++++++++++++-------------- database.hh | 10 ++-- tests/urchin/cql_test_env.cc | 7 ++- tests/urchin/mutation_test.cc | 27 +++++---- thrift/handler.cc | 7 ++- 5 files changed, 95 insertions(+), 66 deletions(-) diff --git a/database.cc b/database.cc index 80254c3227..84f102bdc6 100644 --- a/database.cc +++ b/database.cc @@ -54,7 +54,7 @@ memtable::find_partition(const dht::decorated_key& key) const { return i == partitions.end() ? const_mutation_partition_ptr() : std::make_unique(i->second); } -column_family::const_mutation_partition_ptr +future column_family::find_partition(const dht::decorated_key& key) const { // FIXME: optimize for 0 or 1 entries found case mutation_partition ret(_schema); @@ -67,30 +67,31 @@ column_family::find_partition(const dht::decorated_key& key) const { } } if (any) { - return std::make_unique(std::move(ret)); + return make_ready_future(std::make_unique(std::move(ret))); } else { - return nullptr; + return make_ready_future(); } } -column_family::const_mutation_partition_ptr +future column_family::find_partition_slow(const partition_key& key) const { return find_partition(dht::global_partitioner().decorate_key(*_schema, key)); } -column_family::const_row_ptr -column_family::find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const { - const_mutation_partition_ptr p = find_partition(partition_key); - if (!p) { - return nullptr; - } - auto r = p->find_row(clustering_key); - if (r) { - // FIXME: remove copy if only one data source - return std::make_unique(*r); - } else { - return nullptr; - } +future +column_family::find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const { + return find_partition(partition_key).then([clustering_key = std::move(clustering_key)] (const_mutation_partition_ptr p) { + if (!p) { + return make_ready_future(); + } + auto r = p->find_row(clustering_key); + if (r) { + // FIXME: remove copy if only one data source + return make_ready_future(std::make_unique(*r)); + } else { + return make_ready_future(); + } + }); } mutation_partition& @@ -128,29 +129,46 @@ struct column_family::merge_comparator { }; template -bool +future column_family::for_all_partitions(Func&& func) const { static_assert(std::is_same>::value, "bad Func signature"); using partitions_range = boost::iterator_range; - std::vector tables; + struct iteration_state { + std::vector tables; + std::vector ptables; + nway_merger, merge_comparator> merger; + std::experimental::optional> current; + Func func; + bool ok = true; + bool more = true; + bool done() const { return !(ok && more); } + iteration_state(const column_family& cf, Func&& func) + : merger{merge_comparator(cf.schema())}, func(std::move(func)) { + } + }; + iteration_state is(*this, std::move(func)); + auto& tables = is.tables; + auto& ptables = is.ptables; + auto& merger = is.merger; for (auto&& mt : _memtables) { tables.push_back(boost::make_iterator_range(mt.all_partitions())); } - std::vector ptables; for (auto&& r : tables) { ptables.push_back(&r); } - nway_merger, merge_comparator> merger{merge_comparator(_schema)}; merger.create_heap(ptables); - bool ok = true; - bool more = true; // Can't use memtable::partitions_type::value_type due do constness - std::experimental::optional> current; - while (ok && more) { + return do_with(std::move(is), [this] (iteration_state& is) { + return do_until(std::bind(&iteration_state::done, &is), [&is, this] { + auto& more = is.more; + auto& merger = is.merger; more = merger.pop(boost::make_function_output_iterator([&] (const memtable::partitions_type::value_type& e) { auto&& key = e.first; auto&& mp = e.second; + auto& current = is.current; + auto& func = is.func; + auto& ok = is.ok; // Schema cannot have different keys if (current && !current->first.equal(*_schema, key)) { ok = func(std::move(current->first), std::move(current->second)); @@ -163,15 +181,21 @@ column_family::for_all_partitions(Func&& func) const { current = std::make_pair(key, mp); } })); - } - if (ok && current) { - ok = func(std::move(current->first), std::move(current->second)); - current = std::experimental::nullopt; - } - return ok; + return make_ready_future<>(); + }).then([this, &is] { + auto& ok = is.ok; + auto& current = is.current; + auto& func = is.func; + if (ok && current) { + ok = func(std::move(current->first), std::move(current->second)); + current = std::experimental::nullopt; + } + return make_ready_future(ok); + }); + }); } -bool +future column_family::for_all_partitions_slow(std::function func) const { return for_all_partitions(std::move(func)); } @@ -676,17 +700,21 @@ column_family::query(const query::read_command& cmd) const { auto& limit = qs.limit; auto&& range = *qs.current_partition_range++; if (range.is_singular()) { - auto& key = range.start_value(); - auto partition = find_partition_slow(key); + auto& key = range.start_value(); + return find_partition_slow(key).then([this, &qs, &key] (auto partition) { + auto& cmd = qs.cmd; + auto& builder = qs.builder; + auto& limit = qs.limit; if (!partition) { - return make_ready_future<>(); + return; } auto p_builder = builder.add_partition(key); partition->query(*_schema, cmd.slice, limit, p_builder); p_builder.finish(); limit -= p_builder.row_count(); + }); } else if (range.is_full()) { - for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) { + return for_all_partitions([&] (const dht::decorated_key& dk, const mutation_partition& partition) { auto p_builder = builder.add_partition(dk._key); partition.query(*_schema, cmd.slice, limit, p_builder); p_builder.finish(); @@ -695,7 +723,7 @@ column_family::query(const query::read_command& cmd) const { return false; } return true; - }); + }).discard_result(); } else { fail(unimplemented::cause::RANGE_QUERIES); } @@ -733,13 +761,7 @@ std::ostream& operator<<(std::ostream& os, const mutation& m) { } std::ostream& operator<<(std::ostream& out, const column_family& cf) { - out << "{\n"; - cf.for_all_partitions([&] (const dht::decorated_key& key, const mutation_partition& mp) { - out << key << " => " << mp << "\n"; - return true; - }); - out << "}"; - return out; + return fprint(out, "{column_family: %s/%s}", cf._schema->ks_name(), cf._schema->cf_name()); } std::ostream& operator<<(std::ostream& out, const database& db) { diff --git a/database.hh b/database.hh index 0aa0d9b2b6..2fdc6fd121 100644 --- a/database.hh +++ b/database.hh @@ -88,9 +88,9 @@ public: column_family(column_family&&); ~column_family(); schema_ptr schema() const { return _schema; } - const_mutation_partition_ptr find_partition(const dht::decorated_key& key) const; - const_mutation_partition_ptr find_partition_slow(const partition_key& key) const; - const_row_ptr find_row(const dht::decorated_key& partition_key, const clustering_key& clustering_key) const; + future find_partition(const dht::decorated_key& key) const; + future find_partition_slow(const partition_key& key) const; + future find_row(const dht::decorated_key& partition_key, clustering_key clustering_key) const; void apply(const frozen_mutation& m); void apply(const mutation& m); // Returns at most "cmd.limit" rows @@ -104,13 +104,13 @@ private: // so that iteration can be stopped by returning false. // Func signature: bool (const decorated_key& dk, const mutation_partition& mp) template - bool for_all_partitions(Func&& func) const; + future for_all_partitions(Func&& func) const; future<> probe_file(sstring sstdir, sstring fname); void seal_on_overflow(); public: // Iterate over all partitions. Protocol is the same as std::all_of(), // so that iteration can be stopped by returning false. - bool for_all_partitions_slow(std::function func) const; + future for_all_partitions_slow(std::function func) const; friend std::ostream& operator<<(std::ostream& out, const column_family& cf); }; diff --git a/tests/urchin/cql_test_env.cc b/tests/urchin/cql_test_env.cc index 42400a6a0a..35ec5768b0 100644 --- a/tests/urchin/cql_test_env.cc +++ b/tests/urchin/cql_test_env.cc @@ -129,9 +129,9 @@ public: column_name = std::move(column_name), expected = std::move(expected), table_name = std::move(table_name)] (database& db) { - auto& cf = db.find_column_family(ks_name, table_name); - auto schema = cf.schema(); - auto p = cf.find_partition_slow(pkey); + auto& cf = db.find_column_family(ks_name, table_name); + auto schema = cf.schema(); + return cf.find_partition_slow(pkey).then([schema, ck, column_name, expected] (column_family::const_mutation_partition_ptr p) { assert(p != nullptr); auto row = p->find_row(clustering_key::from_deeply_exploded(*schema, ck)); assert(row != nullptr); @@ -153,6 +153,7 @@ public: serialization_format::internal()); } assert(col_def->type->equal(actual, col_def->type->decompose(expected))); + }); }); } diff --git a/tests/urchin/mutation_test.cc b/tests/urchin/mutation_test.cc index e6d024c2ab..6a4a9c0913 100644 --- a/tests/urchin/mutation_test.cc +++ b/tests/urchin/mutation_test.cc @@ -8,6 +8,7 @@ #include "core/sstring.hh" #include "database.hh" #include "utils/UUID_gen.hh" +#include "core/do_with.hh" #include static sstring some_keyspace("ks"); @@ -228,14 +229,15 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { insert_row(1003, 2003); auto verify_row = [&] (int32_t c1, int32_t r1) { - auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); - auto r = cf.find_row(dht::global_partitioner().decorate_key(*s, key), c_key); + auto c_key = clustering_key::from_exploded(*s, {int32_type->decompose(c1)}); + return cf.find_row(dht::global_partitioner().decorate_key(*s, key), std::move(c_key)).then([r1, r1_col] (auto r) { BOOST_REQUIRE(r); auto i = r->find_cell(r1_col.id); BOOST_REQUIRE(i); auto cell = i->as_atomic_cell(); BOOST_REQUIRE(cell.is_live()); BOOST_REQUIRE(int32_type->equal(cell.value(), int32_type->decompose(r1))); + }); }; verify_row(1001, 2001); verify_row(1002, 2002); @@ -244,13 +246,13 @@ SEASTAR_TEST_CASE(test_multiple_memtables_one_partition) { } SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { - auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, - {{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", int32_type}}, {{"c1", int32_type}}, {{"r1", int32_type}}, {}, utf8_type)); - column_family::config cfg; - cfg.enable_disk_reads = false; - cfg.enable_disk_writes = false; - column_family cf(s, cfg); + column_family::config cfg; + cfg.enable_disk_reads = false; + cfg.enable_disk_writes = false; + return do_with(column_family(s, cfg), [s] (column_family& cf) { std::map> shadow, result; const column_definition& r1_col = *s->get_column_definition("r1"); @@ -275,7 +277,8 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { cf.seal_active_memtable(); } - cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) { + return do_with(std::move(result), [&cf, s, &r1_col, shadow] (auto& result) { + return cf.for_all_partitions_slow([&] (const dht::decorated_key& pk, const mutation_partition& mp) { auto p1 = boost::any_cast(int32_type->deserialize(pk._key.explode(*s)[0])); for (const rows_entry& re : mp.range(*s, query::range())) { auto c1 = boost::any_cast(int32_type->deserialize(re.key().explode(*s)[0])); @@ -285,9 +288,11 @@ SEASTAR_TEST_CASE(test_multiple_memtables_multiple_partitions) { } } return true; + }).then([&result, shadow] (bool ok) { + BOOST_REQUIRE(shadow == result); + }); }); - BOOST_REQUIRE(shadow == result); - return make_ready_future<>(); + }); } SEASTAR_TEST_CASE(test_cell_ordering) { diff --git a/thrift/handler.cc b/thrift/handler.cc index 2afb54e997..ae5d6056d2 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -126,7 +126,6 @@ public: dk = std::move(dk), column_parent = std::move(column_parent), predicate = std::move(predicate)] (database& db) { - std::vector ret; if (!column_parent.super_column.empty()) { throw unimplemented_exception(); } @@ -134,8 +133,9 @@ public: if (predicate.__isset.column_names) { throw unimplemented_exception(); } else if (predicate.__isset.slice_range) { - auto&& range = predicate.slice_range; - column_family::const_row_ptr rw = cf.find_row(dk, clustering_key::make_empty(*cf.schema())); + auto&& range = predicate.slice_range; + return cf.find_row(dk, clustering_key::make_empty(*cf.schema())).then([&cf, range = std::move(range)] (column_family::const_row_ptr rw) { + std::vector ret; if (rw) { auto beg = cf.schema()->regular_begin(); if (!range.start.empty()) { @@ -165,6 +165,7 @@ public: } } return make_foreign(make_lw_shared(std::move(ret))); + }); } else { throw make_exception("empty SlicePredicate"); }