Merge "sstable index write benchmark"
"I am currently looking at the performance of our index_read, since it was in the past pinpointed at the source of problems. While the read side is the one that is mostly interesting, I would like to test both - besides anything else, it is easier to test reads after writes so we don't have to create synthetic data with outside tools. This patch introduces the write side benchmark (read side will hopefully come tomorrow). While the write side is, as mentioned, not the most interesting part, I did see some standing from the flamegraph that allowed me to optimize one particular function, yielding a 8.6 % improvement."
This commit is contained in:
@@ -122,6 +122,7 @@ urchin_tests = [
|
||||
'tests/perf/perf_hash',
|
||||
'tests/perf/perf_cql_parser',
|
||||
'tests/perf/perf_simple_query',
|
||||
'tests/perf/perf_sstable_index',
|
||||
'tests/cql_query_test',
|
||||
'tests/storage_proxy_test',
|
||||
'tests/mutation_reader_test',
|
||||
@@ -394,7 +395,7 @@ deps = {
|
||||
|
||||
for t in urchin_tests:
|
||||
deps[t] = urchin_tests_dependencies + [t + '.cc']
|
||||
if 'types_test' not in t and 'keys_test' not in t and 'partitioner_test' not in t and 'map_difference_test' not in t and 'frozen_mutation_test' not in t and 'perf_mutation' not in t and 'cartesian_product_test' not in t and 'perf_hash' not in t and 'perf_cql_parser' not in t and 'message' not in t and 'perf_simple_query' not in t and 'serialization' not in t and t != 'tests/gossip' and 'compound_test' not in t and 'range_test' not in t and 'crc_test' not in t:
|
||||
if 'types_test' not in t and 'keys_test' not in t and 'partitioner_test' not in t and 'map_difference_test' not in t and 'frozen_mutation_test' not in t and 'perf_mutation' not in t and 'cartesian_product_test' not in t and 'perf_hash' not in t and 'perf_cql_parser' not in t and 'message' not in t and 'perf_simple_query' not in t and 'serialization' not in t and t != 'tests/gossip' and 'compound_test' not in t and 'range_test' not in t and 'crc_test' not in t and 'perf_sstable_index' not in t:
|
||||
deps[t] += urchin_tests_seastar_deps
|
||||
|
||||
deps['tests/sstable_test'] += ['tests/sstable_datafile_test.cc']
|
||||
|
||||
@@ -29,11 +29,8 @@ public:
|
||||
virtual bool preserves_order() override { return true; }
|
||||
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) override;
|
||||
virtual data_type get_token_validator() override { return bytes_type; }
|
||||
virtual bool is_equal(const token& t1, const token& t2) override {
|
||||
return compare_unsigned(t1._data, t2._data) == 0;
|
||||
}
|
||||
virtual bool is_less(const token& t1, const token& t2) override {
|
||||
return compare_unsigned(t1._data, t2._data) < 0;
|
||||
virtual int tri_compare(const token& t1, const token& t2) override {
|
||||
return compare_unsigned(t1._data, t2._data);
|
||||
}
|
||||
virtual token midpoint(const token& t1, const token& t2) const;
|
||||
virtual sstring to_sstring(const dht::token& t) const override {
|
||||
|
||||
@@ -94,35 +94,28 @@ static inline unsigned char get_byte(bytes_view b, size_t off) {
|
||||
}
|
||||
}
|
||||
|
||||
bool i_partitioner::is_equal(const token& t1, const token& t2) {
|
||||
|
||||
size_t sz = std::max(t1._data.size(), t2._data.size());
|
||||
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
auto b1 = get_byte(t1._data, i);
|
||||
auto b2 = get_byte(t2._data, i);
|
||||
if (b1 != b2) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
bool i_partitioner::is_less(const token& t1, const token& t2) {
|
||||
|
||||
int i_partitioner::tri_compare(const token& t1, const token& t2) {
|
||||
size_t sz = std::max(t1._data.size(), t2._data.size());
|
||||
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
auto b1 = get_byte(t1._data, i);
|
||||
auto b2 = get_byte(t2._data, i);
|
||||
if (b1 < b2) {
|
||||
return true;
|
||||
return -1;
|
||||
} else if (b1 > b2) {
|
||||
return false;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tri_compare(const token& t1, const token& t2) {
|
||||
if (t1._kind == t2._kind) {
|
||||
return global_partitioner().tri_compare(t1, t2);
|
||||
} else if (t1._kind < t2._kind) {
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool operator==(const token& t1, const token& t2)
|
||||
@@ -188,19 +181,20 @@ decorated_key::equal(const schema& s, const decorated_key& other) const {
|
||||
|
||||
int
|
||||
decorated_key::tri_compare(const schema& s, const decorated_key& other) const {
|
||||
if (_token == other._token) {
|
||||
return _key.legacy_tri_compare(s, other._key);
|
||||
auto r = dht::tri_compare(_token, other._token);
|
||||
if (r != 0) {
|
||||
return r;
|
||||
} else {
|
||||
return _token < other._token ? -1 : 1;
|
||||
return _key.legacy_tri_compare(s, other._key);
|
||||
}
|
||||
}
|
||||
|
||||
int
|
||||
decorated_key::tri_compare(const schema& s, const ring_position& other) const {
|
||||
if (_token != other.token()) {
|
||||
return _token < other.token() ? -1 : 1;
|
||||
}
|
||||
if (other.has_key()) {
|
||||
auto r = dht::tri_compare(_token, other.token());
|
||||
if (r != 0) {
|
||||
return r;
|
||||
} else if (other.has_key()) {
|
||||
return _key.legacy_tri_compare(s, *other.key());
|
||||
}
|
||||
return -other.relation_to_keys();
|
||||
|
||||
@@ -92,6 +92,7 @@ token minimum_token();
|
||||
token maximum_token();
|
||||
bool operator==(const token& t1, const token& t2);
|
||||
bool operator<(const token& t1, const token& t2);
|
||||
int tri_compare(const token& t1, const token& t2);
|
||||
inline bool operator!=(const token& t1, const token& t2) { return std::rel_ops::operator!=(t1, t2); }
|
||||
inline bool operator>(const token& t1, const token& t2) { return std::rel_ops::operator>(t1, t2); }
|
||||
inline bool operator<=(const token& t1, const token& t2) { return std::rel_ops::operator<=(t1, t2); }
|
||||
@@ -231,17 +232,26 @@ public:
|
||||
*/
|
||||
virtual unsigned shard_of(const token& t) const = 0;
|
||||
protected:
|
||||
/**
|
||||
* @return < 0 if if t1's _data array is less, t2's. 0 if they are equal, and > 0 otherwise. _kind comparison should be done separately.
|
||||
*/
|
||||
virtual int tri_compare(const token& t1, const token& t2);
|
||||
/**
|
||||
* @return true if t1's _data array is equal t2's. _kind comparison should be done separately.
|
||||
*/
|
||||
virtual bool is_equal(const token& t1, const token& t2);
|
||||
bool is_equal(const token& t1, const token& t2) {
|
||||
return tri_compare(t1, t2) == 0;
|
||||
}
|
||||
/**
|
||||
* @return true if t1's _data array is less then t2's. _kind comparison should be done separately.
|
||||
*/
|
||||
virtual bool is_less(const token& t1, const token& t2);
|
||||
bool is_less(const token& t1, const token& t2) {
|
||||
return tri_compare(t1, t2) < 0;
|
||||
}
|
||||
|
||||
friend bool operator==(const token& t1, const token& t2);
|
||||
friend bool operator<(const token& t1, const token& t2);
|
||||
friend int tri_compare(const token& t1, const token& t2);
|
||||
};
|
||||
|
||||
//
|
||||
|
||||
@@ -71,20 +71,15 @@ sstring murmur3_partitioner::to_sstring(const token& t) const {
|
||||
return ::to_sstring(long_token(t));
|
||||
}
|
||||
|
||||
bool murmur3_partitioner::is_equal(const token& t1, const token& t2) {
|
||||
int murmur3_partitioner::tri_compare(const token& t1, const token& t2) {
|
||||
long l1 = long_token(t1);
|
||||
long l2 = long_token(t2);
|
||||
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
|
||||
return l1 == l2;
|
||||
}
|
||||
|
||||
bool murmur3_partitioner::is_less(const token& t1, const token& t2) {
|
||||
|
||||
auto l1 = long_token(t1);
|
||||
auto l2 = long_token(t2);
|
||||
|
||||
return l1 < l2;
|
||||
if (l1 == l2) {
|
||||
return 0;
|
||||
} else {
|
||||
return l1 < l2 ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
token murmur3_partitioner::midpoint(const token& t1, const token& t2) const {
|
||||
|
||||
@@ -18,8 +18,7 @@ public:
|
||||
virtual bool preserves_order() override { return false; }
|
||||
virtual std::map<token, float> describe_ownership(const std::vector<token>& sorted_tokens) override;
|
||||
virtual data_type get_token_validator() override;
|
||||
virtual bool is_equal(const token& t1, const token& t2) override;
|
||||
virtual bool is_less(const token& t1, const token& t2) override;
|
||||
virtual int tri_compare(const token& t1, const token& t2) override;
|
||||
virtual token midpoint(const token& t1, const token& t2) const override;
|
||||
virtual sstring to_sstring(const dht::token& t) const override;
|
||||
virtual unsigned shard_of(const token& t) const override;
|
||||
|
||||
95
tests/perf/perf_sstable.hh
Normal file
95
tests/perf/perf_sstable.hh
Normal file
@@ -0,0 +1,95 @@
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
#include "../sstable_test.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
#include <boost/accumulators/accumulators.hpp>
|
||||
#include <boost/accumulators/statistics.hpp>
|
||||
#include <boost/range/irange.hpp>
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
class test_env {
|
||||
public:
|
||||
struct conf {
|
||||
unsigned partitions;
|
||||
unsigned key_size;
|
||||
sstring dir;
|
||||
};
|
||||
|
||||
private:
|
||||
sstring dir() {
|
||||
return _cfg.dir + "/" + to_sstring(engine().cpu_id());
|
||||
}
|
||||
|
||||
sstring random_key() {
|
||||
sstring key(sstring::initialized_later{}, size_t(_cfg.key_size));
|
||||
for (auto& b: key) {
|
||||
b = _distribution(_generator);
|
||||
}
|
||||
return key;
|
||||
}
|
||||
|
||||
conf _cfg;
|
||||
schema_ptr s;
|
||||
std::default_random_engine _generator;
|
||||
std::uniform_int_distribution<char> _distribution;
|
||||
lw_shared_ptr<memtable> _mt;
|
||||
std::vector<lw_shared_ptr<sstable>> _sst;
|
||||
|
||||
public:
|
||||
test_env(conf cfg) : _cfg(std::move(cfg))
|
||||
, s(uncompressed_schema())
|
||||
, _distribution('@', '~')
|
||||
, _mt(make_lw_shared<memtable>(s))
|
||||
{}
|
||||
|
||||
future<> stop() { return make_ready_future<>(); }
|
||||
|
||||
void fill_memtable() {
|
||||
for (unsigned i = 0; i < _cfg.partitions; i++) {
|
||||
auto key = partition_key::from_deeply_exploded(*s, { boost::any(random_key()) });
|
||||
_mt->apply(mutation(key, s));
|
||||
}
|
||||
}
|
||||
|
||||
using clk = std::chrono::high_resolution_clock;
|
||||
static auto now() {
|
||||
return clk::now();
|
||||
}
|
||||
|
||||
|
||||
// Mappers below
|
||||
future<size_t> flush_memtable(int idx) {
|
||||
size_t partitions = _mt->partition_count();
|
||||
return test_setup::create_empty_test_dir(dir()).then([this, idx] {
|
||||
auto sst = make_lw_shared<sstable>("ks", "cf", dir(), idx, sstable::version_types::ka, sstable::format_types::big);
|
||||
return sst->write_components(*_mt).then([sst] {});
|
||||
}).then([partitions] {
|
||||
return partitions;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// The function func should carry on with the test, and return the number of partitions processed.
|
||||
// time_runs will then map reduce it, and return the aggregate partitions / sec for the whole system.
|
||||
template <typename Func>
|
||||
future<> time_runs(unsigned iterations, distributed<test_env>& dt, Func func) {
|
||||
using namespace boost::accumulators;
|
||||
auto acc = make_lw_shared<accumulator_set<double, features<tag::mean, tag::error_of<tag::mean>>>>();
|
||||
auto idx = boost::irange(0, int(iterations));
|
||||
return parallel_for_each(idx.begin(), idx.end(), [acc, &dt, func] (auto idx) {
|
||||
auto start = test_env::now();
|
||||
return dt.map_reduce(adder<size_t>(), func, std::move(idx)).then([start, acc] (size_t partitions) {
|
||||
auto end = test_env::now();
|
||||
auto duration = std::chrono::duration<double>(end - start).count();
|
||||
auto& a = *acc;
|
||||
double result = partitions / duration;
|
||||
a(result);
|
||||
});
|
||||
}).then([acc, iterations] {
|
||||
std::cout << sprint("%.2f", mean(*acc)) << " +- " << sprint("%.2f", error_of<tag::mean>(*acc)) << " partitions / sec (" << iterations << " runs)\n";
|
||||
});
|
||||
}
|
||||
51
tests/perf/perf_sstable_index.cc
Normal file
51
tests/perf/perf_sstable_index.cc
Normal file
@@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2015 Cloudius Systems
|
||||
*/
|
||||
|
||||
#include <boost/test/unit_test.hpp>
|
||||
#include <core/distributed.hh>
|
||||
#include <core/app-template.hh>
|
||||
#include <core/sstring.hh>
|
||||
#include <random>
|
||||
#include "perf_sstable.hh"
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static unsigned iterations = 30;
|
||||
|
||||
future<> test_write(distributed<test_env>& dt) {
|
||||
return dt.invoke_on_all([] (test_env &t) {
|
||||
t.fill_memtable();
|
||||
}).then([&dt] {
|
||||
return time_runs(iterations, dt, &test_env::flush_memtable);
|
||||
});
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
namespace bpo = boost::program_options;
|
||||
app_template app;
|
||||
app.add_options()
|
||||
("iterations", bpo::value<unsigned>()->default_value(30), "number of iterations")
|
||||
("partitions", bpo::value<unsigned>()->default_value(5000000), "number of partitions")
|
||||
("key_size", bpo::value<unsigned>()->default_value(128), "size of partition key")
|
||||
("testdir", bpo::value<sstring>()->default_value("/var/lib/cassandra/perf-tests"), "directory in which to store the sstables");
|
||||
|
||||
return app.run(argc, argv, [&app] {
|
||||
auto test = make_lw_shared<distributed<test_env>>();
|
||||
|
||||
auto cfg = test_env::conf();
|
||||
iterations = app.configuration()["iterations"].as<unsigned>();
|
||||
cfg.partitions = app.configuration()["partitions"].as<unsigned>();
|
||||
cfg.key_size = app.configuration()["key_size"].as<unsigned>();
|
||||
sstring dir = app.configuration()["testdir"].as<sstring>();
|
||||
cfg.dir = dir;
|
||||
return test->start(std::move(cfg)).then([dir, test] {
|
||||
engine().at_exit([test] { return test->stop(); });
|
||||
return test_setup::create_empty_test_dir(dir);
|
||||
}).then([test] {
|
||||
return test_write(*test).then([test] {});
|
||||
}).then([] {
|
||||
return engine().exit(0);
|
||||
}).or_terminate();
|
||||
});
|
||||
}
|
||||
@@ -23,63 +23,6 @@
|
||||
#include <ftw.h>
|
||||
#include <unistd.h>
|
||||
|
||||
class test_setup {
|
||||
file _f;
|
||||
std::function<future<> (directory_entry de)> _walker;
|
||||
subscription<directory_entry> _listing;
|
||||
static sstring path;
|
||||
|
||||
public:
|
||||
test_setup(file f)
|
||||
: _f(std::move(f))
|
||||
, _listing(_f.list_directory([this] (directory_entry de) { return _remove(de); })) {
|
||||
}
|
||||
~test_setup() {
|
||||
_f.close().finally([save = _f] {});
|
||||
}
|
||||
protected:
|
||||
future<> _create_directory(sstring name) {
|
||||
return engine().make_directory(name);
|
||||
}
|
||||
|
||||
future<> _remove(directory_entry de) {
|
||||
if (de.type == directory_entry_type::regular) {
|
||||
return engine().remove_file(path + "/" + de.name);
|
||||
}
|
||||
return make_ready_future<>();
|
||||
}
|
||||
future<> done() { return _listing.done(); }
|
||||
|
||||
static future<> empty_test_dir() {
|
||||
return engine().open_directory(path).then([] (file f) {
|
||||
auto l = make_lw_shared<test_setup>(std::move(f));
|
||||
return l->done().then([l] { });
|
||||
});
|
||||
}
|
||||
static future<> create_empty_test_dir() {
|
||||
return engine().make_directory(path).then_wrapped([] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
// it's fine if the directory exists, just shut down the exceptional future message
|
||||
} catch (std::exception& e) {}
|
||||
return empty_test_dir();
|
||||
});
|
||||
}
|
||||
public:
|
||||
static future<> do_with_test_directory(std::function<future<> ()>&& fut);
|
||||
};
|
||||
sstring test_setup::path = "tests/sstables/tests-temporary";
|
||||
|
||||
future<> test_setup::do_with_test_directory(std::function<future<> ()>&& fut) {
|
||||
return test_setup::create_empty_test_dir().then([fut = std::move(fut)] () mutable {
|
||||
return fut();
|
||||
}).finally([] {
|
||||
return test_setup::empty_test_dir().then([] {
|
||||
return engine().remove_file(path);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
using namespace sstables;
|
||||
|
||||
static sstring some_keyspace("ks");
|
||||
|
||||
@@ -411,4 +411,74 @@ inline void match_collection_element(const std::pair<bytes, atomic_cell>& elemen
|
||||
BOOST_REQUIRE(element.second.value() == *expected_serialized_value);
|
||||
}
|
||||
}
|
||||
|
||||
class test_setup {
|
||||
file _f;
|
||||
std::function<future<> (directory_entry de)> _walker;
|
||||
sstring _path;
|
||||
subscription<directory_entry> _listing;
|
||||
|
||||
static sstring& path() {
|
||||
static sstring _p = "tests/sstables/tests-temporary";
|
||||
return _p;
|
||||
};
|
||||
|
||||
public:
|
||||
test_setup(file f, sstring path)
|
||||
: _f(std::move(f))
|
||||
, _path(path)
|
||||
, _listing(_f.list_directory([this] (directory_entry de) { return _remove(de); })) {
|
||||
}
|
||||
~test_setup() {
|
||||
_f.close().finally([save = _f] {});
|
||||
}
|
||||
protected:
|
||||
future<> _create_directory(sstring name) {
|
||||
return engine().make_directory(name);
|
||||
}
|
||||
|
||||
future<> _remove(directory_entry de) {
|
||||
sstring t = _path + "/" + de.name;
|
||||
return engine().file_type(t).then([t] (std::experimental::optional<directory_entry_type> det) {
|
||||
auto f = make_ready_future<>();
|
||||
|
||||
if (!det) {
|
||||
throw std::runtime_error("Can't determine file type\n");
|
||||
} else if (det == directory_entry_type::directory) {
|
||||
f = empty_test_dir(t);
|
||||
}
|
||||
return f.then([t] {
|
||||
return engine().remove_file(t);
|
||||
});
|
||||
});
|
||||
}
|
||||
future<> done() { return _listing.done(); }
|
||||
|
||||
static future<> empty_test_dir(sstring p = path()) {
|
||||
return engine().open_directory(p).then([p] (file f) {
|
||||
auto l = make_lw_shared<test_setup>(std::move(f), p);
|
||||
return l->done().then([l] { });
|
||||
});
|
||||
}
|
||||
public:
|
||||
static future<> create_empty_test_dir(sstring p = path()) {
|
||||
return engine().make_directory(p).then_wrapped([p] (future<> f) {
|
||||
try {
|
||||
f.get();
|
||||
// it's fine if the directory exists, just shut down the exceptional future message
|
||||
} catch (std::exception& e) {}
|
||||
return empty_test_dir(p);
|
||||
});
|
||||
}
|
||||
|
||||
static future<> do_with_test_directory(std::function<future<> ()>&& fut, sstring p = path()) {
|
||||
return test_setup::create_empty_test_dir(p).then([fut = std::move(fut), p] () mutable {
|
||||
return fut();
|
||||
}).finally([p] {
|
||||
return test_setup::empty_test_dir(p).then([p] {
|
||||
return engine().remove_file(p);
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user