diff --git a/configure.py b/configure.py index 01d8c885ee..599a1ecd8d 100755 --- a/configure.py +++ b/configure.py @@ -122,6 +122,7 @@ urchin_tests = [ 'tests/perf/perf_hash', 'tests/perf/perf_cql_parser', 'tests/perf/perf_simple_query', + 'tests/perf/perf_sstable_index', 'tests/cql_query_test', 'tests/storage_proxy_test', 'tests/mutation_reader_test', @@ -394,7 +395,7 @@ deps = { for t in urchin_tests: deps[t] = urchin_tests_dependencies + [t + '.cc'] - if 'types_test' not in t and 'keys_test' not in t and 'partitioner_test' not in t and 'map_difference_test' not in t and 'frozen_mutation_test' not in t and 'perf_mutation' not in t and 'cartesian_product_test' not in t and 'perf_hash' not in t and 'perf_cql_parser' not in t and 'message' not in t and 'perf_simple_query' not in t and 'serialization' not in t and t != 'tests/gossip' and 'compound_test' not in t and 'range_test' not in t and 'crc_test' not in t: + if 'types_test' not in t and 'keys_test' not in t and 'partitioner_test' not in t and 'map_difference_test' not in t and 'frozen_mutation_test' not in t and 'perf_mutation' not in t and 'cartesian_product_test' not in t and 'perf_hash' not in t and 'perf_cql_parser' not in t and 'message' not in t and 'perf_simple_query' not in t and 'serialization' not in t and t != 'tests/gossip' and 'compound_test' not in t and 'range_test' not in t and 'crc_test' not in t and 'perf_sstable_index' not in t: deps[t] += urchin_tests_seastar_deps deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc'] diff --git a/dht/byte_ordered_partitioner.hh b/dht/byte_ordered_partitioner.hh index 06d223e87e..62cd10c1fa 100644 --- a/dht/byte_ordered_partitioner.hh +++ b/dht/byte_ordered_partitioner.hh @@ -29,11 +29,8 @@ public: virtual bool preserves_order() override { return true; } virtual std::map describe_ownership(const std::vector& sorted_tokens) override; virtual data_type get_token_validator() override { return bytes_type; } - virtual bool is_equal(const token& t1, const token& t2) override { - return compare_unsigned(t1._data, t2._data) == 0; - } - virtual bool is_less(const token& t1, const token& t2) override { - return compare_unsigned(t1._data, t2._data) < 0; + virtual int tri_compare(const token& t1, const token& t2) override { + return compare_unsigned(t1._data, t2._data); } virtual token midpoint(const token& t1, const token& t2) const; virtual sstring to_sstring(const dht::token& t) const override { diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc index b35f2ba611..d20b265695 100644 --- a/dht/i_partitioner.cc +++ b/dht/i_partitioner.cc @@ -94,35 +94,28 @@ static inline unsigned char get_byte(bytes_view b, size_t off) { } } -bool i_partitioner::is_equal(const token& t1, const token& t2) { - - size_t sz = std::max(t1._data.size(), t2._data.size()); - - for (size_t i = 0; i < sz; i++) { - auto b1 = get_byte(t1._data, i); - auto b2 = get_byte(t2._data, i); - if (b1 != b2) { - return false; - } - } - return true; - -} - -bool i_partitioner::is_less(const token& t1, const token& t2) { - +int i_partitioner::tri_compare(const token& t1, const token& t2) { size_t sz = std::max(t1._data.size(), t2._data.size()); for (size_t i = 0; i < sz; i++) { auto b1 = get_byte(t1._data, i); auto b2 = get_byte(t2._data, i); if (b1 < b2) { - return true; + return -1; } else if (b1 > b2) { - return false; + return 1; } } - return false; + return 0; +} + +int tri_compare(const token& t1, const token& t2) { + if (t1._kind == t2._kind) { + return global_partitioner().tri_compare(t1, t2); + } else if (t1._kind < t2._kind) { + return -1; + } + return 1; } bool operator==(const token& t1, const token& t2) @@ -188,19 +181,20 @@ decorated_key::equal(const schema& s, const decorated_key& other) const { int decorated_key::tri_compare(const schema& s, const decorated_key& other) const { - if (_token == other._token) { - return _key.legacy_tri_compare(s, other._key); + auto r = dht::tri_compare(_token, other._token); + if (r != 0) { + return r; } else { - return _token < other._token ? -1 : 1; + return _key.legacy_tri_compare(s, other._key); } } int decorated_key::tri_compare(const schema& s, const ring_position& other) const { - if (_token != other.token()) { - return _token < other.token() ? -1 : 1; - } - if (other.has_key()) { + auto r = dht::tri_compare(_token, other.token()); + if (r != 0) { + return r; + } else if (other.has_key()) { return _key.legacy_tri_compare(s, *other.key()); } return -other.relation_to_keys(); diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh index a5256fa8e7..fa8fc1edd6 100644 --- a/dht/i_partitioner.hh +++ b/dht/i_partitioner.hh @@ -92,6 +92,7 @@ token minimum_token(); token maximum_token(); bool operator==(const token& t1, const token& t2); bool operator<(const token& t1, const token& t2); +int tri_compare(const token& t1, const token& t2); inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); } inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); } inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); } @@ -231,17 +232,26 @@ public: */ virtual unsigned shard_of(const token& t) const = 0; protected: + /** + * @return < 0 if if t1's _data array is less, t2's. 0 if they are equal, and > 0 otherwise. _kind comparison should be done separately. + */ + virtual int tri_compare(const token& t1, const token& t2); /** * @return true if t1's _data array is equal t2's. _kind comparison should be done separately. */ - virtual bool is_equal(const token& t1, const token& t2); + bool is_equal(const token& t1, const token& t2) { + return tri_compare(t1, t2) == 0; + } /** * @return true if t1's _data array is less then t2's. _kind comparison should be done separately. */ - virtual bool is_less(const token& t1, const token& t2); + bool is_less(const token& t1, const token& t2) { + return tri_compare(t1, t2) < 0; + } friend bool operator==(const token& t1, const token& t2); friend bool operator<(const token& t1, const token& t2); + friend int tri_compare(const token& t1, const token& t2); }; // diff --git a/dht/murmur3_partitioner.cc b/dht/murmur3_partitioner.cc index 8d21ee4ec3..49bbb0a337 100644 --- a/dht/murmur3_partitioner.cc +++ b/dht/murmur3_partitioner.cc @@ -71,20 +71,15 @@ sstring murmur3_partitioner::to_sstring(const token& t) const { return ::to_sstring(long_token(t)); } -bool murmur3_partitioner::is_equal(const token& t1, const token& t2) { +int murmur3_partitioner::tri_compare(const token& t1, const token& t2) { + long l1 = long_token(t1); + long l2 = long_token(t2); - auto l1 = long_token(t1); - auto l2 = long_token(t2); - - return l1 == l2; -} - -bool murmur3_partitioner::is_less(const token& t1, const token& t2) { - - auto l1 = long_token(t1); - auto l2 = long_token(t2); - - return l1 < l2; + if (l1 == l2) { + return 0; + } else { + return l1 < l2 ? -1 : 1; + } } token murmur3_partitioner::midpoint(const token& t1, const token& t2) const { diff --git a/dht/murmur3_partitioner.hh b/dht/murmur3_partitioner.hh index db640a8349..371bf5f911 100644 --- a/dht/murmur3_partitioner.hh +++ b/dht/murmur3_partitioner.hh @@ -18,8 +18,7 @@ public: virtual bool preserves_order() override { return false; } virtual std::map describe_ownership(const std::vector& sorted_tokens) override; virtual data_type get_token_validator() override; - virtual bool is_equal(const token& t1, const token& t2) override; - virtual bool is_less(const token& t1, const token& t2) override; + virtual int tri_compare(const token& t1, const token& t2) override; virtual token midpoint(const token& t1, const token& t2) const override; virtual sstring to_sstring(const dht::token& t) const override; virtual unsigned shard_of(const token& t) const override; diff --git a/tests/perf/perf_sstable.hh b/tests/perf/perf_sstable.hh new file mode 100644 index 0000000000..187536564e --- /dev/null +++ b/tests/perf/perf_sstable.hh @@ -0,0 +1,95 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once +#include "../sstable_test.hh" +#include "sstables/sstables.hh" +#include +#include +#include + +using namespace sstables; + +class test_env { +public: + struct conf { + unsigned partitions; + unsigned key_size; + sstring dir; + }; + +private: + sstring dir() { + return _cfg.dir + "/" + to_sstring(engine().cpu_id()); + } + + sstring random_key() { + sstring key(sstring::initialized_later{}, size_t(_cfg.key_size)); + for (auto& b: key) { + b = _distribution(_generator); + } + return key; + } + + conf _cfg; + schema_ptr s; + std::default_random_engine _generator; + std::uniform_int_distribution _distribution; + lw_shared_ptr _mt; + std::vector> _sst; + +public: + test_env(conf cfg) : _cfg(std::move(cfg)) + , s(uncompressed_schema()) + , _distribution('@', '~') + , _mt(make_lw_shared(s)) + {} + + future<> stop() { return make_ready_future<>(); } + + void fill_memtable() { + for (unsigned i = 0; i < _cfg.partitions; i++) { + auto key = partition_key::from_deeply_exploded(*s, { boost::any(random_key()) }); + _mt->apply(mutation(key, s)); + } + } + + using clk = std::chrono::high_resolution_clock; + static auto now() { + return clk::now(); + } + + + // Mappers below + future flush_memtable(int idx) { + size_t partitions = _mt->partition_count(); + return test_setup::create_empty_test_dir(dir()).then([this, idx] { + auto sst = make_lw_shared("ks", "cf", dir(), idx, sstable::version_types::ka, sstable::format_types::big); + return sst->write_components(*_mt).then([sst] {}); + }).then([partitions] { + return partitions; + }); + } +}; + +// The function func should carry on with the test, and return the number of partitions processed. +// time_runs will then map reduce it, and return the aggregate partitions / sec for the whole system. +template +future<> time_runs(unsigned iterations, distributed& dt, Func func) { + using namespace boost::accumulators; + auto acc = make_lw_shared>>>(); + auto idx = boost::irange(0, int(iterations)); + return parallel_for_each(idx.begin(), idx.end(), [acc, &dt, func] (auto idx) { + auto start = test_env::now(); + return dt.map_reduce(adder(), func, std::move(idx)).then([start, acc] (size_t partitions) { + auto end = test_env::now(); + auto duration = std::chrono::duration(end - start).count(); + auto& a = *acc; + double result = partitions / duration; + a(result); + }); + }).then([acc, iterations] { + std::cout << sprint("%.2f", mean(*acc)) << " +- " << sprint("%.2f", error_of(*acc)) << " partitions / sec (" << iterations << " runs)\n"; + }); +} diff --git a/tests/perf/perf_sstable_index.cc b/tests/perf/perf_sstable_index.cc new file mode 100644 index 0000000000..3d668c4027 --- /dev/null +++ b/tests/perf/perf_sstable_index.cc @@ -0,0 +1,51 @@ +/* + * Copyright 2015 Cloudius Systems + */ + +#include +#include +#include +#include +#include +#include "perf_sstable.hh" + +using namespace sstables; + +static unsigned iterations = 30; + +future<> test_write(distributed& dt) { + return dt.invoke_on_all([] (test_env &t) { + t.fill_memtable(); + }).then([&dt] { + return time_runs(iterations, dt, &test_env::flush_memtable); + }); +} + +int main(int argc, char** argv) { + namespace bpo = boost::program_options; + app_template app; + app.add_options() + ("iterations", bpo::value()->default_value(30), "number of iterations") + ("partitions", bpo::value()->default_value(5000000), "number of partitions") + ("key_size", bpo::value()->default_value(128), "size of partition key") + ("testdir", bpo::value()->default_value("/var/lib/cassandra/perf-tests"), "directory in which to store the sstables"); + + return app.run(argc, argv, [&app] { + auto test = make_lw_shared>(); + + auto cfg = test_env::conf(); + iterations = app.configuration()["iterations"].as(); + cfg.partitions = app.configuration()["partitions"].as(); + cfg.key_size = app.configuration()["key_size"].as(); + sstring dir = app.configuration()["testdir"].as(); + cfg.dir = dir; + return test->start(std::move(cfg)).then([dir, test] { + engine().at_exit([test] { return test->stop(); }); + return test_setup::create_empty_test_dir(dir); + }).then([test] { + return test_write(*test).then([test] {}); + }).then([] { + return engine().exit(0); + }).or_terminate(); + }); +} diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index b407d7fe9b..6a87140d69 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -23,63 +23,6 @@ #include #include -class test_setup { - file _f; - std::function (directory_entry de)> _walker; - subscription _listing; - static sstring path; - -public: - test_setup(file f) - : _f(std::move(f)) - , _listing(_f.list_directory([this] (directory_entry de) { return _remove(de); })) { - } - ~test_setup() { - _f.close().finally([save = _f] {}); - } -protected: - future<> _create_directory(sstring name) { - return engine().make_directory(name); - } - - future<> _remove(directory_entry de) { - if (de.type == directory_entry_type::regular) { - return engine().remove_file(path + "/" + de.name); - } - return make_ready_future<>(); - } - future<> done() { return _listing.done(); } - - static future<> empty_test_dir() { - return engine().open_directory(path).then([] (file f) { - auto l = make_lw_shared(std::move(f)); - return l->done().then([l] { }); - }); - } - static future<> create_empty_test_dir() { - return engine().make_directory(path).then_wrapped([] (future<> f) { - try { - f.get(); - // it's fine if the directory exists, just shut down the exceptional future message - } catch (std::exception& e) {} - return empty_test_dir(); - }); - } -public: - static future<> do_with_test_directory(std::function ()>&& fut); -}; -sstring test_setup::path = "tests/sstables/tests-temporary"; - -future<> test_setup::do_with_test_directory(std::function ()>&& fut) { - return test_setup::create_empty_test_dir().then([fut = std::move(fut)] () mutable { - return fut(); - }).finally([] { - return test_setup::empty_test_dir().then([] { - return engine().remove_file(path); - }); - }); -} - using namespace sstables; static sstring some_keyspace("ks"); diff --git a/tests/sstable_test.hh b/tests/sstable_test.hh index 2910fd5406..4468c29f58 100644 --- a/tests/sstable_test.hh +++ b/tests/sstable_test.hh @@ -411,4 +411,74 @@ inline void match_collection_element(const std::pair& elemen BOOST_REQUIRE(element.second.value() == *expected_serialized_value); } } + +class test_setup { + file _f; + std::function (directory_entry de)> _walker; + sstring _path; + subscription _listing; + + static sstring& path() { + static sstring _p = "tests/sstables/tests-temporary"; + return _p; + }; + +public: + test_setup(file f, sstring path) + : _f(std::move(f)) + , _path(path) + , _listing(_f.list_directory([this] (directory_entry de) { return _remove(de); })) { + } + ~test_setup() { + _f.close().finally([save = _f] {}); + } +protected: + future<> _create_directory(sstring name) { + return engine().make_directory(name); + } + + future<> _remove(directory_entry de) { + sstring t = _path + "/" + de.name; + return engine().file_type(t).then([t] (std::experimental::optional det) { + auto f = make_ready_future<>(); + + if (!det) { + throw std::runtime_error("Can't determine file type\n"); + } else if (det == directory_entry_type::directory) { + f = empty_test_dir(t); + } + return f.then([t] { + return engine().remove_file(t); + }); + }); + } + future<> done() { return _listing.done(); } + + static future<> empty_test_dir(sstring p = path()) { + return engine().open_directory(p).then([p] (file f) { + auto l = make_lw_shared(std::move(f), p); + return l->done().then([l] { }); + }); + } +public: + static future<> create_empty_test_dir(sstring p = path()) { + return engine().make_directory(p).then_wrapped([p] (future<> f) { + try { + f.get(); + // it's fine if the directory exists, just shut down the exceptional future message + } catch (std::exception& e) {} + return empty_test_dir(p); + }); + } + + static future<> do_with_test_directory(std::function ()>&& fut, sstring p = path()) { + return test_setup::create_empty_test_dir(p).then([fut = std::move(fut), p] () mutable { + return fut(); + }).finally([p] { + return test_setup::empty_test_dir(p).then([p] { + return engine().remove_file(p); + }); + }); + } +}; }