From 700325886db5be79f82c95a3c377dbfe049da42a Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Mon, 13 Oct 2014 14:01:31 +0200 Subject: [PATCH 01/22] convert non-system includes to use quotes --- net/udp.hh | 6 +++--- tests/futures_test.cc | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/net/udp.hh b/net/udp.hh index 311c52cc38..4f9a1ba88b 100644 --- a/net/udp.hh +++ b/net/udp.hh @@ -6,11 +6,11 @@ #ifndef UDP_HH_ #define UDP_HH_ -#include -#include -#include #include #include +#include "net/ip.hh" +#include "core/reactor.hh" +#include "core/shared_ptr.hh" #include "net/api.hh" #include "const.hh" diff --git a/tests/futures_test.cc b/tests/futures_test.cc index 5beb3153e4..dcd1a9a64f 100644 --- a/tests/futures_test.cc +++ b/tests/futures_test.cc @@ -2,8 +2,8 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#include -#include +#include "core/app-template.hh" +#include "core/shared_ptr.hh" #include "test-utils.hh" future<> test_finally_is_called_on_success_and_failure() { From 4e3317b0723998f0a15caf365f1379a5d12c8c55 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Sat, 11 Oct 2014 14:30:52 +0200 Subject: [PATCH 02/22] core: fix formatting --- core/app-template.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/app-template.hh b/core/app-template.hh index ac93d717af..e3395aa250 100644 --- a/core/app-template.hh +++ b/core/app-template.hh @@ -53,9 +53,9 @@ public: get_ex(); } catch (std::exception& ex) { std::cout << "program failed with uncaught exception: " << ex.what() << "\n"; - engine.exit(1); + engine.exit(1); } - }); + }); return engine.run(); }; }; From 5f3352b7e3d9e0b3edb8b2d689eefc52df647e66 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 8 Oct 2014 18:29:26 +0200 Subject: [PATCH 03/22] tests: integrate with BOOST UTF Asynchronous test cases can be delared using SEASTAR_TEST_CASE macro, which is equivalent to BOOST_AUTO_TEST_CASE, but the function body is run inside a reactor and can returns a future<> which resolves when the test is done. --- configure.py | 2 +- tests/futures_test.cc | 78 ++++++++++++------------------------------- tests/test-utils.hh | 67 +++++++++++++++++++++++++++---------- tests/timertest.cc | 10 +++++- 4 files changed, 80 insertions(+), 77 deletions(-) diff --git a/configure.py b/configure.py index 9d9a10fc7e..1ff5fd65c4 100755 --- a/configure.py +++ b/configure.py @@ -74,7 +74,7 @@ modes = { }, } -libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm' +libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm -lboost_unit_test_framework' hwloc_libs = '-lhwloc -lnuma -lpciaccess -lxml2 -lz' warnings = [ diff --git a/tests/futures_test.cc b/tests/futures_test.cc index dcd1a9a64f..007515bcfb 100644 --- a/tests/futures_test.cc +++ b/tests/futures_test.cc @@ -6,12 +6,11 @@ #include "core/shared_ptr.hh" #include "test-utils.hh" -future<> test_finally_is_called_on_success_and_failure() { +SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure) { auto finally1 = make_shared(); auto finally2 = make_shared(); return make_ready_future().then([] { - OK(); }).finally([=] { *finally1 = true; }).then([] { @@ -19,66 +18,50 @@ future<> test_finally_is_called_on_success_and_failure() { }).finally([=] { *finally2 = true; }).rescue([=] (auto get) { - if (!*finally1) { - BUG(); - } - if (!*finally2) { - BUG(); - } + BOOST_REQUIRE(*finally1); + BOOST_REQUIRE(*finally2); // Should be failed. try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); } -future<> test_exception_from_finally_fails_the_target() { +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target) { promise<> pr; auto f = pr.get_future().finally([=] { - OK(); throw std::runtime_error(""); }).then([] { - BUG(); - }).rescue([] (auto get) { - OK(); - }); + BOOST_REQUIRE(false); + }).rescue([] (auto get) {}); pr.set_value(); return f; } -future<> test_exception_from_finally_fails_the_target_on_already_resolved() { +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target_on_already_resolved) { return make_ready_future().finally([=] { - OK(); throw std::runtime_error(""); }).then([] { - BUG(); - }).rescue([] (auto get) { - OK(); - }); + BOOST_REQUIRE(false); + }).rescue([] (auto get) {}); } -future<> test_exception_thrown_from_rescue_causes_future_to_fail() -{ +SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail) { return make_ready_future().rescue([] (auto get) { throw std::runtime_error(""); }).rescue([] (auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); } -future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() -{ +SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail__async_case) { promise<> p; auto f = p.get_future().rescue([] (auto get) { @@ -86,10 +69,8 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() }).rescue([] (auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); p.set_value(); @@ -97,15 +78,14 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() return f; } -future<> test_failing_intermediate_promise_should_fail_the_master_future() { +SEASTAR_TEST_CASE(test_failing_intermediate_promise_should_fail_the_master_future) { promise<> p1; promise<> p2; auto f = p1.get_future().then([f = std::move(p2.get_future())] () mutable { - OK(); return std::move(f); }).then([] { - BUG(); + BOOST_REQUIRE(false); }); p1.set_value(); @@ -114,23 +94,7 @@ future<> test_failing_intermediate_promise_should_fail_the_master_future() { return f.rescue([](auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } - }); -} - -int main(int ac, char **av) -{ - return app_template().run(ac, av, [] { - run_tests( - test_finally_is_called_on_success_and_failure() - .then(test_exception_from_finally_fails_the_target) - .then(test_exception_from_finally_fails_the_target_on_already_resolved) - .then(test_exception_thrown_from_rescue_causes_future_to_fail) - .then(test_exception_thrown_from_rescue_causes_future_to_fail__async_case) - .then(test_failing_intermediate_promise_should_fail_the_master_future) - ); + BOOST_REQUIRE(false); + } catch (...) {} }); } diff --git a/tests/test-utils.hh b/tests/test-utils.hh index 555b3b7510..951adc9f76 100644 --- a/tests/test-utils.hh +++ b/tests/test-utils.hh @@ -6,27 +6,58 @@ #define _TEST_UTILS_HH #include -#include +#include +#include "core/future.hh" +#include "core/reactor.hh" +#include "core/app-template.hh" -#define BUG() do { \ - std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \ - throw std::runtime_error("test failed"); \ - } while (0) +using namespace boost::unit_test; -#define OK() { \ - std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \ - } while (0) +class seastar_test { +public: + seastar_test(); + virtual ~seastar_test() {} + virtual const char* get_name() = 0; + virtual future<> run_test_case() = 0; -static inline -void run_tests(future<> tests_done) { - tests_done.rescue([] (auto get) { - try { - get(); - exit(0); - } catch(...) { - std::terminate(); - } - }); + void run() { + posix_thread t([this] { + engine.when_started().then([this] { + return run_test_case(); + }).rescue([] (auto get) { + try { + get(); + engine.exit(0); + } catch (...) { + std::terminate(); + } + }); + engine.run(); + }); + t.join(); + } +}; + +static std::vector tests; + +seastar_test::seastar_test() { + tests.push_back(this); } +test_suite* init_unit_test_suite(int argc, char* argv[]) { + test_suite* ts = BOOST_TEST_SUITE("seastar-tests"); + for (seastar_test* test : tests) { + ts->add(boost::unit_test::make_test_case([test] { test->run(); }, test->get_name())); + } + return ts; +} + +#define SEASTAR_TEST_CASE(name) \ + struct name : public seastar_test { \ + const char* get_name() override { return #name; } \ + future<> run_test_case() override; \ + }; \ + static name name ## _instance; \ + future<> name::run_test_case() + #endif diff --git a/tests/timertest.cc b/tests/timertest.cc index 93aa584cf6..aca5808077 100644 --- a/tests/timertest.cc +++ b/tests/timertest.cc @@ -4,11 +4,19 @@ #include "core/reactor.hh" #include "core/print.hh" -#include "test-utils.hh" #include using namespace std::chrono_literals; +#define BUG() do { \ + std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \ + throw std::runtime_error("test failed"); \ + } while (0) + +#define OK() { \ + std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \ + } while (0) + struct timer_test { timer t1; timer t2; From da10ab64438ff10578b06235cbbdba7c8f4e934a Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 15:45:26 +0200 Subject: [PATCH 04/22] tests: add sstring tests --- configure.py | 6 ++++-- tests/sstring_test.cc | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 tests/sstring_test.cc diff --git a/configure.py b/configure.py index 1ff5fd65c4..8140b85c11 100755 --- a/configure.py +++ b/configure.py @@ -12,6 +12,7 @@ tests = [ 'tests/udp_server', 'tests/udp_client', 'tests/blkdiscard_test', + 'tests/sstring_test', ] apps = [ @@ -57,6 +58,7 @@ deps = { 'tests/udp_server': ['tests/udp_server.cc'] + core + libnet, 'tests/udp_client': ['tests/udp_client.cc'] + core + libnet, 'tests/blkdiscard_test': ['tests/blkdiscard_test.cc'] + core, + 'tests/sstring_test': ['tests/sstring_test.cc'] + core, } modes = { @@ -139,9 +141,9 @@ def debug_flag(compiler): src_with_auto = textwrap.dedent('''\ template struct x { auto f() {} }; - + x a; - ''') + ''') if try_compile(source = src_with_auto, flags = ['-g', '-std=gnu++1y'], compiler = compiler): return '-g' else: diff --git a/tests/sstring_test.cc b/tests/sstring_test.cc new file mode 100644 index 0000000000..9c5e60fcdd --- /dev/null +++ b/tests/sstring_test.cc @@ -0,0 +1,17 @@ +/* + * Copyright 2014 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "core/sstring.hh" + +BOOST_AUTO_TEST_CASE(test_equality) { + BOOST_REQUIRE_EQUAL(sstring("aaa"), sstring("aaa")); +} + +BOOST_AUTO_TEST_CASE(test_to_sstring) { + BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567")); +} From c4d18b6ae6ecc632056a64aa1747509850d4f337 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 15:46:15 +0200 Subject: [PATCH 05/22] tests: introduce test runner script To run all unit tests: $ ./test.py [3/6] RUNNING build/release/tests/sstring_test --- test.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100755 test.py diff --git a/test.py b/test.py new file mode 100755 index 0000000000..0a3a370682 --- /dev/null +++ b/test.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +import os +import sys +import subprocess + +all_tests = [ + 'futures_test', + 'sstring_test', +] + +last_len = 0 + +def print_status(msg): + global last_len + print('\r' + ' '*last_len, end='') + last_len = len(msg) + print('\r' + msg, end='') + +if __name__ == "__main__": + black_hole = open('/dev/null', 'w') + + test_to_run = [] + for mode in ['debug', 'release']: + for test in all_tests: + test_to_run.append(os.path.join('build', mode, 'tests', test)) + + all_ok = True + + n_total = len(test_to_run) + for n, path in enumerate(test_to_run): + prefix = '[%d/%d]' % (n + 1, n_total) + if not os.path.exists(path): + print_status('MISSING: %s\n' % (path)) + all_ok = False + else: + print_status('%s RUNNING %s' % (prefix, path)) + if subprocess.call([path], stdout=black_hole, stderr=black_hole): + print_status('FAILED: %s\n' % (path)) + all_ok = False + else: + print_status('%s PASSED %s' % (prefix, path)) + + if all_ok: + print('\nOK.') + else: + print_status('') + sys.exit(1) From 930181d361e9999e149413a73967ca49d46e15f8 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 8 Oct 2014 18:28:31 +0200 Subject: [PATCH 06/22] net: fix packet constructors Thy did not compile. --- net/packet.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/net/packet.hh b/net/packet.hh index c76cae1471..a35601d352 100644 --- a/net/packet.hh +++ b/net/packet.hh @@ -385,7 +385,7 @@ packet::packet(fragment frag, Deleter d, packet&& x) _impl->_frags + _impl->_nr_frags + 1); ++_impl->_nr_frags; _impl->_frags[0] = frag; - _impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d)); + _impl->_deleter = make_deleter(std::move(_impl->_deleter), d); } template @@ -394,7 +394,7 @@ packet::packet(packet&& x, fragment frag, Deleter d) : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) { _impl->_len += frag.size; _impl->_frags[_impl->_nr_frags++] = frag; - _impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d)); + _impl->_deleter = make_deleter(std::move(_impl->_deleter), d); } inline From d97c8eb50cd8d900e4f7ee89885d75456ba6a4aa Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 8 Oct 2014 18:30:19 +0200 Subject: [PATCH 07/22] net: introduce packet as data_source adapter --- net/packet-data-source.hh | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 net/packet-data-source.hh diff --git a/net/packet-data-source.hh b/net/packet-data-source.hh new file mode 100644 index 0000000000..178607f989 --- /dev/null +++ b/net/packet-data-source.hh @@ -0,0 +1,35 @@ +#ifndef _PACKET_DATA_SOURCE_HH +#define _PACKET_DATA_SOURCE_HH + +#include "core/reactor.hh" +#include "net/packet.hh" + +namespace net { + +class packet_data_source final : public data_source_impl { + size_t _cur_frag = 0; + packet _p; +public: + explicit packet_data_source(net::packet&& p) + : _p(std::move(p)) + {} + + virtual future> get() override { + if (_cur_frag != _p.nr_frags()) { + auto& f = _p.fragments()[_cur_frag++]; + return make_ready_future>( + temporary_buffer(f.base, f.size, + make_deleter(deleter(), [p = _p.share()] () mutable {}))); + } + return make_ready_future>(temporary_buffer(0)); + } +}; + +static inline +input_stream as_input_stream(packet&& p) { + return input_stream(data_source(std::make_unique(std::move(p)))); +} + +} + +#endif From 6fabc8700f641caebd2f8ebad86f36faab31e02e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 8 Oct 2014 18:29:11 +0200 Subject: [PATCH 08/22] core: introduce timer::rearm() --- core/reactor.hh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/reactor.hh b/core/reactor.hh index 911a130db8..4c72385b91 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -117,6 +117,7 @@ public: future<> expired(); void set_callback(callback_t&& callback); void arm(clock_type::time_point until, boost::optional period = {}); + void rearm(clock_type::time_point until, boost::optional period = {}); void arm(clock_type::duration delta); void arm_periodic(clock_type::duration delta); bool armed() const { return _armed; } @@ -982,6 +983,14 @@ void timer::arm(clock_type::time_point until, boost::optional period) { + if (_armed) { + cancel(); + } + arm(until, period); +} + inline void timer::arm(clock_type::duration delta) { return arm(clock_type::now() + delta); From ceeda8508090a99b2abf326a0b46dc9d83f5a4fe Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 17:15:31 +0200 Subject: [PATCH 09/22] core: add more overloads of output_stream::write() --- core/reactor.hh | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/core/reactor.hh b/core/reactor.hh index 4c72385b91..89fe4edaf0 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -572,10 +572,24 @@ public: output_stream(data_sink fd, size_t size) : _fd(std::move(fd)), _buf(size), _size(size) {} future<> write(const char_type* buf, size_t n); + future<> write(const char_type* buf); + future<> write(const sstring& s); future<> flush(); private: }; +template +inline +future<> output_stream::write(const char_type* buf) { + return write(buf, strlen(buf)); +} + +template +inline +future<> output_stream::write(const sstring& s) { + return write(s.c_str(), s.size()); +} + class file_impl { public: virtual ~file_impl() {} From dec042f9c22397e20b4365309f890b51606fb98e Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 17:16:59 +0200 Subject: [PATCH 10/22] core: introduce do_for_each() Useful when composing iteration with async operations. --- core/async-action.hh | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/core/async-action.hh b/core/async-action.hh index f9447c2b1d..02835278d9 100644 --- a/core/async-action.hh +++ b/core/async-action.hh @@ -60,4 +60,19 @@ void keep_doing(AsyncAction&& action) { } } +template +static inline +future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + while (begin != end) { + auto f = action(*begin++); + if (!f.available()) { + return f.then([action = std::forward(action), + begin = std::move(begin), end = std::move(end)] () mutable { + return do_for_each(std::move(begin), std::move(end), std::forward(action)); + }); + } + } + return make_ready_future<>(); +} + #endif From 277535ca75110bcf002d31f4b96a8c593210ae63 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 20:01:38 +0200 Subject: [PATCH 11/22] core: introduce vector_data_sink --- core/vector-data-sink.hh | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 core/vector-data-sink.hh diff --git a/core/vector-data-sink.hh b/core/vector-data-sink.hh new file mode 100644 index 0000000000..ec1a905be0 --- /dev/null +++ b/core/vector-data-sink.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2014 Cloudius Systems + */ + +#ifndef VECTOR_DATA_SINK_HH_ +#define VECTOR_DATA_SINK_HH_ + +#include "core/reactor.hh" + +class vector_data_sink final : public data_sink_impl { +private: + using vector_type = std::vector>; + vector_type& _v; +public: + vector_data_sink(vector_type& v) : _v(v) {} + + virtual future<> put(std::vector> data) override { + for (auto&& buf : data) { + _v.push_back(std::move(buf)); + } + return make_ready_future<>(); + } + + virtual future<> put(temporary_buffer data) override { + _v.push_back(std::move(data)); + return make_ready_future<>(); + } +}; + +#endif From ca077f33ef20fd589f50f4eb4f8ed90138c9f308 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 13:35:17 +0200 Subject: [PATCH 12/22] core: introduce or_terminate() Calls std::terminate() when future fails. It can be used like this: do_something([] {}).or_terminate(); --- core/future.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/future.hh b/core/future.hh index 1b8fd60b2b..da489f2988 100644 --- a/core/future.hh +++ b/core/future.hh @@ -433,6 +433,16 @@ public: return f; } + future<> or_terminate() { + return rescue([] (auto get) { + try { + get(); + } catch (...) { + std::terminate(); + } + }); + } + template friend class promise; template From 454ee88bb6da896e667c5b0f87fbc45c9e2244fa Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 13:49:21 +0200 Subject: [PATCH 13/22] core: make keep_doing() propagate failure --- core/async-action.hh | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/async-action.hh b/core/async-action.hh index 02835278d9..3af904a152 100644 --- a/core/async-action.hh +++ b/core/async-action.hh @@ -45,17 +45,20 @@ future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) { return f; } -// Invoke given action undefinitely. Next invocation starts when previous completes or fails. +// Invoke given action until it fails. template static inline -void keep_doing(AsyncAction&& action) { +future<> keep_doing(AsyncAction&& action) { while (true) { auto f = action(); if (!f.available()) { - f.then([action = std::forward(action)] () mutable { - keep_doing(std::forward(action)); + return f.then([action = std::forward(action)] () mutable { + return keep_doing(std::forward(action)); }); - return; + } + + if (f.failed()) { + return std::move(f); } } } From 88537320f36aec1a12ae10ab7bfaeff792b5b437 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 8 Oct 2014 18:25:27 +0200 Subject: [PATCH 14/22] memcache: initial version Supports UDP and subset of ASCII protocol with the following commands: get, set and delete. --- apps/memcache/ascii.rl | 100 +++++++++++ apps/memcache/memcache.cc | 362 ++++++++++++++++++++++++++++++++++++++ configure.py | 6 + 3 files changed, 468 insertions(+) create mode 100644 apps/memcache/ascii.rl create mode 100644 apps/memcache/memcache.cc diff --git a/apps/memcache/ascii.rl b/apps/memcache/ascii.rl new file mode 100644 index 0000000000..2f5dc53793 --- /dev/null +++ b/apps/memcache/ascii.rl @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include "core/ragel.hh" +#include +#include +#include + +%%{ + +machine memcache_ascii_protocol; + +access _fsm_; + +action mark { + g.mark_start(p); +} + +action start_blob { + g.mark_start(p); + _size_left = _size; +} + +action advance_blob { + auto len = std::min((uint32_t)(pe - p), _size_left); + _size_left -= len; + p += len; + if (_size_left == 0) { + _blob = str(); + p--; + fret; + } + p--; +} + +crlf = '\r\n'; +sp = ' '; +u32 = digit+ >{ _u32 = 0; } ${ _u32 *= 10; _u32 += fc - '0'; }; +key = [^ ]+ >mark %{ _key = str(); }; +flags = u32 %{ _flags = _u32; }; +expiration = u32 %{ _expiration = _u32; }; +size = u32 %{ _size = _u32; }; +blob := any+ >start_blob $advance_blob; + +set = "set" sp key sp flags sp expiration sp size (crlf @{ fcall blob; } ) crlf @{ _state = state::cmd_set; }; +get = "get" (sp key %{ _keys.push_back(std::move(_key)); })+ crlf @{ _state = state::cmd_get; }; +delete = "delete" sp key crlf @{ _state = state::cmd_delete; }; + +main := set | get | delete; + +prepush { + prepush(); +} + +postpop { + postpop(); +} + +}%% + +class memcache_ascii_parser : public ragel_parser_base { + %% write data nofinal noprefix; +public: + enum class state { + error, + eof, + cmd_set, + cmd_get, + cmd_delete + }; + state _state; + uint32_t _u32; + sstring _key; + uint32_t _flags; + uint32_t _expiration; + uint32_t _size; + uint32_t _size_left; + sstring _blob; + std::vector _keys; +public: + void init() { + _state = state::error; + _keys.clear(); + %% write init; + } + + char* parse(char* p, char* pe, char* eof) { + sstring_builder::guard g(_builder, p, pe); + auto str = [this, &g, &p] { g.mark_end(p); return get_str(); }; + %% write exec; + if (_state != state::error) { + return p; + } + return nullptr; + } + bool eof() const { + return _state == state::eof; + } +}; diff --git a/apps/memcache/memcache.cc b/apps/memcache/memcache.cc new file mode 100644 index 0000000000..308dcfd66f --- /dev/null +++ b/apps/memcache/memcache.cc @@ -0,0 +1,362 @@ +#include +#include +#include "core/app-template.hh" +#include "core/async-action.hh" +#include "core/timer-set.hh" +#include "core/shared_ptr.hh" +#include "core/stream.hh" +#include "core/vector-data-sink.hh" +#include "net/api.hh" +#include "net/packet-data-source.hh" +#include "apps/memcache/ascii.hh" + +using namespace net; + +namespace bi = boost::intrusive; + +namespace memcache { + +template +using optional = boost::optional; + +using item_key = sstring; + +struct item_data { + sstring _data; + uint32_t _flag; + clock_type::time_point _expiry; +}; + +class item { +private: + item_data _data; + uint64_t _version; + bool _expired; + bi::list_member_hook<> _timer_link; + bi::list_member_hook<> _expired_link; + friend class cache; +public: + item(item_data data) + : _data(std::move(data)) + , _version(1) + , _expired(false) + { + } + + clock_type::time_point get_timeout() { + return _data._expiry; + } + + void update(item_data&& data) { + _data = std::move(data); + _version++; + } + + item_data& data() { + return _data; + } +}; + +class cache { +private: + using cache_type = std::unordered_map>; + using cache_iterator = typename cache_type::iterator; + cache_type _cache; + timer_set _alive; + bi::list, &item::_expired_link>> _expired; + timer _timer; +private: + void expire() { + _alive.expire(clock_type::now()); + while (auto item = _alive.pop_expired()) { + item->_expired = true; + _expired.push_back(*item); + } + _timer.arm(_alive.get_next_timeout()); + } + + inline + cache_iterator find(const item_key& key) { + auto i = _cache.find(key); + if (i != _cache.end()) { + auto& item_ref = *i->second; + if (item_ref._expired) { + _expired.erase(_expired.iterator_to(item_ref)); + _cache.erase(i); + return _cache.end(); + } + } + return i; + } + + inline + void add_overriding(cache_iterator i, item_data&& data) { + auto& item_ref = *i->second; + _alive.remove(item_ref); + item_ref.update(std::move(data)); + if (_alive.insert(item_ref)) { + _timer.rearm(item_ref.get_timeout()); + } + } + + inline + void add_new(item_key&& key, item_data&& data) { + auto r = _cache.emplace(std::move(key), make_shared(std::move(data))); + assert(r.second); + auto& item_ref = *r.first->second; + if (_alive.insert(item_ref)) { + _timer.rearm(item_ref.get_timeout()); + } + } +public: + cache() { + _timer.set_callback([this] { expire(); }); + } + + bool set(item_key&& key, item_data data) { + auto i = find(key); + if (i != _cache.end()) { + add_overriding(i, std::move(data)); + return true; + } else { + add_new(std::move(key), std::move(data)); + return false; + } + } + + bool add(item_key&& key, item_data data) { + auto i = find(key); + if (i != _cache.end()) { + return false; + } + + add_new(std::move(key), std::move(data)); + return true; + } + + bool replace(const item_key& key, item_data data) { + auto i = find(key); + if (i == _cache.end()) { + return false; + } + + add_overriding(i, std::move(data)); + return true; + } + + bool remove(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + return false; + } + auto& item_ref = *i->second; + _alive.remove(item_ref); + _cache.erase(i); + return true; + } + + shared_ptr get(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + return {}; + } + return i->second; + } +}; + +class ascii_protocol { +private: + cache& _cache; + memcache_ascii_parser _parser; +private: + static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30; + static constexpr const char *msg_crlf = "\r\n"; + static constexpr const char *msg_error = "ERROR\r\n"; + static constexpr const char *msg_stored = "STORED\r\n"; + static constexpr const char *msg_end = "END\r\n"; + static constexpr const char *msg_value = "VALUE "; + static constexpr const char *msg_deleted = "DELETED\r\n"; + static constexpr const char *msg_not_found = "NOT_FOUND\r\n"; +public: + ascii_protocol(cache& cache) : _cache(cache) {} + + clock_type::time_point seconds_to_time_point(uint32_t seconds) { + if (seconds == 0) { + return clock_type::time_point::max(); + } else if (seconds <= seconds_in_a_month) { + return clock_type::now() + std::chrono::seconds(seconds); + } else { + return clock_type::time_point(std::chrono::seconds(seconds)); + } + } + + future<> handle(input_stream& in, output_stream& out) { + _parser.init(); + return in.consume(_parser).then([this, &out] () -> future<> { + switch (_parser._state) { + case memcache_ascii_parser::state::error: + case memcache_ascii_parser::state::eof: + return out.write(msg_error); + + case memcache_ascii_parser::state::cmd_set: + _cache.set(std::move(_parser._key), + item_data{std::move(_parser._blob), _parser._flags, seconds_to_time_point(_parser._expiration)}); + return out.write(msg_stored); + + case memcache_ascii_parser::state::cmd_get: + { + auto keys_p = make_shared>(std::move(_parser._keys)); + return do_for_each(keys_p->begin(), keys_p->end(), [this, &out, keys_p](auto&& key) mutable { + auto item = _cache.get(key); + if (!item) { + return make_ready_future<>(); + } + return out.write(msg_value) + .then([&out, &key] { + return out.write(key); + }).then([&out] { + return out.write(" "); + }).then([&out, item] { + return out.write(to_sstring(item->data()._flag)); + }).then([&out] { + return out.write(" "); + }).then([&out, item] { + return out.write(to_sstring(item->data()._data.size())); + }).then([&out] { + return out.write(msg_crlf); + }).then([&out, item] { + return out.write(item->data()._data); + }).then([&out] { + return out.write(msg_crlf); + }); + }).then([&out] { + return out.write(msg_end); + }); + } + + case memcache_ascii_parser::state::cmd_delete: + if (_cache.remove(_parser._key)) { + return out.write(msg_deleted); + } + return out.write(msg_not_found); + }; + return make_ready_future<>(); + }); + }; +}; + +void assert_resolved(future<> f) { + assert(f.available()); +} + +class udp_server { +public: + static const size_t default_max_datagram_size = 1400; +private: + ascii_protocol& _proto; + udp_channel _chan; + uint16_t _port; + size_t _max_datagram_size = default_max_datagram_size; + + struct header { + packed _request_id; + packed _sequence_number; + packed _n; + packed _reserved; + + template + auto adjust_endianness(Adjuster a) { + return a(_request_id, _sequence_number, _n); + } + } __attribute__((packed)); + +public: + udp_server(ascii_protocol& proto, uint16_t port = 11211) + : _proto(proto) + , _port(port) + {} + + void set_max_datagram_size(size_t max_datagram_size) { + _max_datagram_size = max_datagram_size; + } + + future<> respond(ipv4_addr dst, uint16_t request_id, std::vector>&& datagrams) { + if (datagrams.size() == 1) { + auto&& buf = datagrams[0]; + auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release()); + header *out_hdr = p.prepend_header
(); + out_hdr->_request_id = request_id; + out_hdr->_sequence_number = 0; + out_hdr->_n = 1; + hton(*out_hdr); + return _chan.send(dst, std::move(p)); + } + + int i = 0; + auto sb = make_shared(std::move(datagrams)); + return do_for_each(sb->begin(), sb->end(), + [this, i, sb, dst, request_id](auto&& buf) mutable { + auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release()); + header *out_hdr = p.prepend_header
(); + out_hdr->_request_id = request_id; + out_hdr->_sequence_number = i++; + out_hdr->_n = sb->size(); + hton(*out_hdr); + return _chan.send(dst, std::move(p)); + }); + } + + void start() { + _chan = engine.net().make_udp_channel({_port}); + keep_doing([this] { + return _chan.receive().then([this](udp_datagram dgram) { + packet& p = dgram.get_data(); + if (p.len() < sizeof(header)) { + // dropping invalid packet + return make_ready_future<>(); + } + + std::vector> out_bufs; + auto out = output_stream(data_sink(std::make_unique(out_bufs)), + _max_datagram_size - sizeof(header)); + + header *hdr = p.get_header
(); + ntoh(*hdr); + p.trim_front(sizeof(*hdr)); + + auto request_id = hdr->_request_id; + + if (hdr->_n != 1 || hdr->_sequence_number != 0) { + out.write("CLIENT_ERROR only single-datagram requests supported\r\n"); + } else { + auto in = as_input_stream(std::move(p)); + assert_resolved(_proto.handle(in, out)); + } + + assert_resolved(out.flush()); + return respond(dgram.get_src(), request_id, std::move(out_bufs)); + }); + }).or_terminate(); + }; +}; + +} /* namespace memcache */ + +int main(int ac, char** av) +{ + memcache::cache cache; + memcache::ascii_protocol ascii_protocol(cache); + memcache::udp_server server(ascii_protocol); + + app_template app; + app.add_options() + ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), + "Maximum size of UDP datagram") + ; + + return app.run(ac, av, [&server, &app] { + auto&& config = app.configuration(); + server.set_max_datagram_size(config["max-datagram-size"].as()); + server.start(); + }); +} diff --git a/configure.py b/configure.py index 8140b85c11..7aa605d900 100755 --- a/configure.py +++ b/configure.py @@ -18,6 +18,7 @@ tests = [ apps = [ 'apps/httpd/httpd', 'apps/seastar/seastar', + 'apps/memcache/memcache', ] all_artifacts = apps + tests @@ -44,10 +45,15 @@ core = [ 'net/posix-stack.cc', ] +memcache = [ + 'apps/memcache/ascii.rl' +] + libnet + core + deps = { 'apps/seastar/seastar': ['apps/seastar/main.cc'] + core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, + 'apps/memcache/memcache': ['apps/memcache/memcache.cc'] + memcache, 'tests/fileiotest': ['tests/fileiotest.cc'] + core, 'tests/virtiotest': ['tests/virtiotest.cc'] + core + libnet, 'tests/l3_test': ['tests/l3_test.cc'] + core + libnet, From 97a16c16fda24baed3de935ee57fbc4933850f01 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 17:14:07 +0200 Subject: [PATCH 15/22] core: add operator<< which can print any vector Useful in tests. --- core/sstring.hh | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/sstring.hh b/core/sstring.hh index d371514b1d..ee897e2a96 100644 --- a/core/sstring.hh +++ b/core/sstring.hh @@ -243,4 +243,21 @@ string_type to_sstring(unsigned long long value, void* = nullptr) { return to_sstring_sprintf(value, "%llu"); } +template +inline +std::ostream& operator<<(std::ostream& os, const std::vector& v) { + bool first = true; + os << "{"; + for (auto&& elem : v) { + if (!first) { + os << ", "; + } else { + first = false; + } + os << elem; + } + os << "}"; + return os; +} + #endif /* SSTRING_HH_ */ From 6a31f3762c5bb72d9cf762cc3293ffd4c13422cc Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 17:52:33 +0200 Subject: [PATCH 16/22] tests: memcache: add test for ASCII parser --- configure.py | 2 + test.py | 1 + tests/memcache/test_ascii_parser.cc | 232 ++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 tests/memcache/test_ascii_parser.cc diff --git a/configure.py b/configure.py index 7aa605d900..e482ff138e 100755 --- a/configure.py +++ b/configure.py @@ -13,6 +13,7 @@ tests = [ 'tests/udp_client', 'tests/blkdiscard_test', 'tests/sstring_test', + 'tests/memcache/test_ascii_parser', ] apps = [ @@ -54,6 +55,7 @@ deps = { 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, 'apps/memcache/memcache': ['apps/memcache/memcache.cc'] + memcache, + 'tests/memcache/test_ascii_parser': ['tests/memcache/test_ascii_parser.cc'] + memcache, 'tests/fileiotest': ['tests/fileiotest.cc'] + core, 'tests/virtiotest': ['tests/virtiotest.cc'] + core + libnet, 'tests/l3_test': ['tests/l3_test.cc'] + core + libnet, diff --git a/test.py b/test.py index 0a3a370682..bf387c3f43 100755 --- a/test.py +++ b/test.py @@ -5,6 +5,7 @@ import subprocess all_tests = [ 'futures_test', + 'memcache/test_ascii_parser', 'sstring_test', ] diff --git a/tests/memcache/test_ascii_parser.cc b/tests/memcache/test_ascii_parser.cc new file mode 100644 index 0000000000..7288ad6db3 --- /dev/null +++ b/tests/memcache/test_ascii_parser.cc @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include +#include +#include "tests/test-utils.hh" +#include "core/shared_ptr.hh" +#include "net/packet-data-source.hh" +#include "apps/memcache/ascii.hh" + +using namespace net; + +using parser_type = memcache_ascii_parser; + +static packet make_packet(std::vector chunks) { + packet p; + for (auto&& chunk : chunks) { + size_t size = chunk.size(); + char* b = new char[size]; + memcpy(b, chunk.c_str(), size); + p = packet(std::move(p), fragment{b, size}, [b] { delete[] b; }); + } + return p; +} + +static auto make_input_stream(packet&& p) { + return input_stream(data_source( + std::make_unique(std::move(p)))); +} + +static auto parse(packet&& p) { + auto is = make_input_stream(std::move(p)); + auto parser = make_shared(); + parser->init(); + return is.consume(*parser).then([parser] { + return make_ready_future>(parser); + }); +} + +SEASTAR_TEST_CASE(test_set_command_is_parsed) { + return parse(make_packet({"set key 1 2 3\r\nabc\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_blob == "abc"); + }); +} + +SEASTAR_TEST_CASE(test_empty_data_is_parsed) { + return parse(make_packet({"set key 1 2 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 0); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_blob == ""); + }); +} + +SEASTAR_TEST_CASE(test_superflous_data_is_an_error) { + return parse(make_packet({"set key 0 0 0\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); +} + +SEASTAR_TEST_CASE(test_not_enough_data_is_an_error) { + return parse(make_packet({"set key 0 0 3\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); +} + +SEASTAR_TEST_CASE(test_u32_parsing) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"set key 0 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 0); + }); + }).then([] { + return parse(make_packet({"set key 12345 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 12345); + }); + }).then([] { + return parse(make_packet({"set key -1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([] { + return parse(make_packet({"set key 1-1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([] { + return parse(make_packet({"set key " + std::to_string(std::numeric_limits::max()) + " 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == std::numeric_limits::max()); + }); + }); +} + +SEASTAR_TEST_CASE(test_parsing_of_split_data) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"set key 11", "1 222 3\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 11", "1 22", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set k", "ey 11", "1 2", "2", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\n", "asd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\na", "sd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\nasd", "\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\nasd\r", "\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }); +} + +SEASTAR_TEST_CASE(test_get_parsing) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"get key1\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1"})); + }); + }).then([] { + return parse(make_packet({"get key1 key2\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1", "key2"})); + }); + }).then([] { + return parse(make_packet({"get key1 key2 key3\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1", "key2", "key3"})); + }); + }); +} + +SEASTAR_TEST_CASE(test_multiple_requests_in_one_stream) { + auto p = make_shared(); + auto is = make_shared(make_input_stream(make_packet({"set key1 1 1 5\r\ndata1\r\nset key2 2 2 6\r\ndata2+\r\n"}))); + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key1"); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 1); + BOOST_REQUIRE(p->_size == 5); + BOOST_REQUIRE(p->_blob == "data1"); + }).then([is, p] { + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key2"); + BOOST_REQUIRE(p->_flags == 2); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 6); + BOOST_REQUIRE(p->_blob == "data2+"); + }); + }); +} From 50feeef580204599b9d542cd09130d31cc9409cc Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 9 Oct 2014 20:03:06 +0200 Subject: [PATCH 17/22] tests: memcache: introduce end-to-end tests These tests exercise memcache instance using a real client. The test_memcache.py suite can be run against any memcache instance which conforms to the ASCII protocol. --- test.py | 13 ++--- tests/memcache/test.py | 19 +++++++ tests/memcache/test_memcache.py | 98 +++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 8 deletions(-) create mode 100755 tests/memcache/test.py create mode 100755 tests/memcache/test_memcache.py diff --git a/test.py b/test.py index bf387c3f43..2e2e7e0b44 100755 --- a/test.py +++ b/test.py @@ -24,22 +24,19 @@ if __name__ == "__main__": for mode in ['debug', 'release']: for test in all_tests: test_to_run.append(os.path.join('build', mode, 'tests', test)) + test_to_run.append('tests/memcache/test.py ' + os.path.join('build', mode, 'apps', 'memcache', 'memcache')) all_ok = True n_total = len(test_to_run) for n, path in enumerate(test_to_run): prefix = '[%d/%d]' % (n + 1, n_total) - if not os.path.exists(path): - print_status('MISSING: %s\n' % (path)) + print_status('%s RUNNING %s' % (prefix, path)) + if subprocess.call(path.split(' '), stdout=black_hole, stderr=black_hole): + print_status('FAILED: %s\n' % (path)) all_ok = False else: - print_status('%s RUNNING %s' % (prefix, path)) - if subprocess.call([path], stdout=black_hole, stderr=black_hole): - print_status('FAILED: %s\n' % (path)) - all_ok = False - else: - print_status('%s PASSED %s' % (prefix, path)) + print_status('%s PASSED %s' % (prefix, path)) if all_ok: print('\nOK.') diff --git a/tests/memcache/test.py b/tests/memcache/test.py new file mode 100755 index 0000000000..b48b41497b --- /dev/null +++ b/tests/memcache/test.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +import time +import subprocess +import sys + +if len(sys.argv) < 2: + print('Usage: %s ' % sys.argv[0]) + +memcache_path = sys.argv[1] + +mc = subprocess.Popen([memcache_path]) +print('Memcache started.') +try: + time.sleep(0.1) + print('Starting tests...') + subprocess.check_call(['tests/memcache/test_memcache.py']) +finally: + print('Killing memcache...') + mc.kill() diff --git a/tests/memcache/test_memcache.py b/tests/memcache/test_memcache.py new file mode 100755 index 0000000000..48e8a00485 --- /dev/null +++ b/tests/memcache/test_memcache.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +import socket +import struct +import random +import time +import unittest + +server_addr = ('127.0.0.1', 11211) + +def udp_call(server_addr, msg): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + this_req_id = random.randint(-32768, 32767) + + datagram = struct.pack(">hhhh", this_req_id, 0, 1, 0) + msg.encode() + sock.sendto(datagram, server_addr) + + messages = {} + n_determined = None + while True: + data, addr = sock.recvfrom(1500) + req_id, seq, n, res = struct.unpack_from(">hhhh", data) + content = data[8:] + + if n_determined and n_determined != n: + raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined, n)) + n_determined = n + + if req_id != this_req_id: + raise Exception('Invalid request id: ' + req_id + ', expected ' + this_req_id) + + if seq in messages: + raise Exception('Duplicate message for seq=' + seq) + + messages[seq] = content + if len(messages) == n: + break + + msg = b'' + for k, v in sorted(messages.items(), key=lambda e: e[0]): + msg += v + + sock.close() + return msg + +def call(cmd): + return udp_call(server_addr, cmd) + +class TestUDPProtocol(unittest.TestCase): + def call_set(self, key, value, call_fn=call, flags=0, expiry=0): + self.assertEqual(call_fn('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n') + + def call_delete(self, key, call_fn=call): + self.assertEqual(call_fn('delete %s\r\n' % key), b'DELETED\r\n') + + def test_basic_commands(self): + self.assertEqual(call('get key\r\n'), b'END\r\n') + self.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + self.assertEqual(call('delete key\r\n'), b'DELETED\r\n') + self.assertEqual(call('delete key\r\n'), b'NOT_FOUND\r\n') + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_expiry(self): + self.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(1) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_expiry_at_epoch_time(self): + expiry = int(time.time()) + 1 + self.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(2) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_mutliple_keys_in_get(self): + self.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b'STORED\r\n') + self.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b'STORED\r\n') + self.assertEqual(call('get key1 key\r\n'), b'VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n') + + def test_response_spanning_many_datagrams(self): + key1_data = '1' * 1000 + key2_data = '2' * 1000 + key3_data = '3' * 1000 + self.call_set('key1', key1_data) + self.call_set('key2', key2_data) + self.call_set('key3', key3_data) + self.assertEqual(call('get key1 key2 key3\r\n').decode(), + 'VALUE key1 0 %d\r\n%s\r\n' \ + 'VALUE key2 0 %d\r\n%s\r\n' \ + 'VALUE key3 0 %d\r\n%s\r\n' \ + 'END\r\n' % (len(key1_data), key1_data, len(key2_data), key2_data, len(key3_data), key3_data)) + self.call_delete('key1') + self.call_delete('key2') + self.call_delete('key3') + +if __name__ == '__main__': + unittest.main() From d575ad2dfbebd4a10fbe77536319401d254f975b Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 13:28:47 +0200 Subject: [PATCH 18/22] memcache: introduce periodic stats printer --- apps/memcache/memcache.cc | 54 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/apps/memcache/memcache.cc b/apps/memcache/memcache.cc index 308dcfd66f..6d9a418756 100644 --- a/apps/memcache/memcache.cc +++ b/apps/memcache/memcache.cc @@ -1,5 +1,6 @@ #include #include +#include #include "core/app-template.hh" #include "core/async-action.hh" #include "core/timer-set.hh" @@ -57,6 +58,13 @@ public: } }; +struct cache_stats { + size_t _get_hits {}; + size_t _get_misses {}; + size_t _set_adds {}; + size_t _set_replaces {}; +}; + class cache { private: using cache_type = std::unordered_map>; @@ -65,6 +73,7 @@ private: timer_set _alive; bi::list, &item::_expired_link>> _expired; timer _timer; + cache_stats _stats; private: void expire() { _alive.expire(clock_type::now()); @@ -117,9 +126,11 @@ public: auto i = find(key); if (i != _cache.end()) { add_overriding(i, std::move(data)); + _stats._set_replaces++; return true; } else { add_new(std::move(key), std::move(data)); + _stats._set_adds++; return false; } } @@ -158,10 +169,20 @@ public: shared_ptr get(const item_key& key) { auto i = find(key); if (i == _cache.end()) { + _stats._get_misses++; return {}; } + _stats._get_hits++; return i->second; } + + size_t size() { + return _cache.size(); + } + + cache_stats& stats() { + return _stats; + } }; class ascii_protocol { @@ -340,6 +361,31 @@ public: }; }; +class stats_printer { +private: + timer _timer; + cache& _cache; +public: + stats_printer(cache& cache) + : _cache(cache) {} + + void start() { + _timer.set_callback([this] { + auto stats = _cache.stats(); + auto gets_total = stats._get_hits + stats._get_misses; + auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0; + auto sets_total = stats._set_adds + stats._set_replaces; + auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0; + std::cout << "items: " << _cache.size() << " " + << std::setprecision(2) << std::fixed + << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) " + << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%) " + << std::endl; + }); + _timer.arm_periodic(std::chrono::seconds(1)); + } +}; + } /* namespace memcache */ int main(int ac, char** av) @@ -347,16 +393,22 @@ int main(int ac, char** av) memcache::cache cache; memcache::ascii_protocol ascii_protocol(cache); memcache::udp_server server(ascii_protocol); + memcache::stats_printer stats(cache); app_template app; app.add_options() ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), "Maximum size of UDP datagram") + ("stats", + "Print basic statistics periodically (every second)") ; - return app.run(ac, av, [&server, &app] { + return app.run(ac, av, [&server, &stats, &app] { auto&& config = app.configuration(); server.set_max_datagram_size(config["max-datagram-size"].as()); + if (config.count("stats")) { + stats.start(); + } server.start(); }); } From a4eba9b03122a10888bf61c7263232fdbc56cf7d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 17:23:40 +0200 Subject: [PATCH 19/22] posix: suppress SIGPIPE When socket was closed by the other side and we try to write into it the process gets SIGPIPE. We should suppress it, so that failure is handled locally. Now the failure will percolate up as an exception. --- core/reactor.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/reactor.hh b/core/reactor.hh index 89fe4edaf0..414cb70d5d 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -751,7 +751,7 @@ inline future reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) { return writeable(fd).then([this, &fd, buffer, len] () mutable { - auto r = fd.fd.send(buffer, len, 0); + auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL); if (!r) { return write_some(fd, buffer, len); } From 319ae0b530107dce510c27f45d21b4ad696ef07d Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 15:00:00 +0200 Subject: [PATCH 20/22] core: add shared_ptr::operator=(T&&) Allows to move an item directly into existing shread_ptr. --- core/shared_ptr.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/shared_ptr.hh b/core/shared_ptr.hh index 90a2294e09..44008916c1 100644 --- a/core/shared_ptr.hh +++ b/core/shared_ptr.hh @@ -71,6 +71,11 @@ public: } return *this; } + shared_ptr& operator=(T&& x) { + this->~shared_ptr(); + new (this) shared_ptr(new data(std::move(x))); + return *this; + } T& operator*() const { return _p->_value; } T* operator->() const { return &_p->_value; } From ec42ca860a6c5cb1cf0d05ff7a5d7d74e1f2aadc Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 15:00:56 +0200 Subject: [PATCH 21/22] memcache: add TCP server --- apps/memcache/memcache.cc | 48 +++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/apps/memcache/memcache.cc b/apps/memcache/memcache.cc index 6d9a418756..e029f2717e 100644 --- a/apps/memcache/memcache.cc +++ b/apps/memcache/memcache.cc @@ -361,6 +361,44 @@ public: }; }; +class tcp_server { +private: + shared_ptr _listener; + cache& _cache; + uint16_t _port; + struct connection { + connected_socket _socket; + socket_address _addr; + input_stream _in; + output_stream _out; + ascii_protocol _proto; + connection(connected_socket&& socket, socket_address addr, cache& c) + : _socket(std::move(socket)) + , _addr(addr) + , _in(_socket.input()) + , _out(_socket.output()) + , _proto(c) + {} + }; +public: + tcp_server(cache& cache, uint16_t port = 11211) : _cache(cache), _port(port) {} + void start() { + listen_options lo; + lo.reuse_address = true; + _listener = engine.listen(make_ipv4_address({_port}), lo); + keep_doing([this] { + return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable { + auto conn = make_shared(std::move(fd), addr, _cache); + keep_doing([this, conn] { + return conn->_proto.handle(conn->_in, conn->_out).then([conn] { + return conn->_out.flush(); + }); + }); + }); + }).or_terminate(); + } +}; + class stats_printer { private: timer _timer; @@ -392,7 +430,8 @@ int main(int ac, char** av) { memcache::cache cache; memcache::ascii_protocol ascii_protocol(cache); - memcache::udp_server server(ascii_protocol); + memcache::udp_server udp_server(ascii_protocol); + memcache::tcp_server tcp_server(cache); memcache::stats_printer stats(cache); app_template app; @@ -403,12 +442,13 @@ int main(int ac, char** av) "Print basic statistics periodically (every second)") ; - return app.run(ac, av, [&server, &stats, &app] { + return app.run(ac, av, [&] { auto&& config = app.configuration(); - server.set_max_datagram_size(config["max-datagram-size"].as()); + udp_server.set_max_datagram_size(config["max-datagram-size"].as()); if (config.count("stats")) { stats.start(); } - server.start(); + udp_server.start(); + tcp_server.start(); }); } From bb5d1164b18e7730c5058c87fc1a3635df413dcb Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Tue, 14 Oct 2014 19:13:28 +0200 Subject: [PATCH 22/22] tests: memcache: test both TCP and UDP --- tests/memcache/test.py | 23 +++++++++++------- tests/memcache/test_memcache.py | 42 +++++++++++++++++++++++++-------- 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/tests/memcache/test.py b/tests/memcache/test.py index b48b41497b..84fa77e95d 100755 --- a/tests/memcache/test.py +++ b/tests/memcache/test.py @@ -8,12 +8,17 @@ if len(sys.argv) < 2: memcache_path = sys.argv[1] -mc = subprocess.Popen([memcache_path]) -print('Memcache started.') -try: - time.sleep(0.1) - print('Starting tests...') - subprocess.check_call(['tests/memcache/test_memcache.py']) -finally: - print('Killing memcache...') - mc.kill() +def run(cmd): + mc = subprocess.Popen([memcache_path]) + print('Memcache started.') + try: + time.sleep(0.1) + cmdline = ['tests/memcache/test_memcache.py'] + cmd + print('Running: ' + ' '.join(cmdline)) + subprocess.check_call(cmdline) + finally: + print('Killing memcache...') + mc.kill() + +run([]) +run(['-U']) diff --git a/tests/memcache/test_memcache.py b/tests/memcache/test_memcache.py index 48e8a00485..def7f842ad 100755 --- a/tests/memcache/test_memcache.py +++ b/tests/memcache/test_memcache.py @@ -2,10 +2,20 @@ import socket import struct import random +import argparse import time import unittest -server_addr = ('127.0.0.1', 11211) +server_addr = None +call = None + +def tcp_call(server_addr, msg): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(server_addr) + s.send(msg.encode()) + data = s.recv(16*1024) + s.close() + return data def udp_call(server_addr, msg): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -42,15 +52,12 @@ def udp_call(server_addr, msg): sock.close() return msg -def call(cmd): - return udp_call(server_addr, cmd) +class TestCommands(unittest.TestCase): + def call_set(self, key, value, flags=0, expiry=0): + self.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n') -class TestUDPProtocol(unittest.TestCase): - def call_set(self, key, value, call_fn=call, flags=0, expiry=0): - self.assertEqual(call_fn('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n') - - def call_delete(self, key, call_fn=call): - self.assertEqual(call_fn('delete %s\r\n' % key), b'DELETED\r\n') + def call_delete(self, key): + self.assertEqual(call('delete %s\r\n' % key), b'DELETED\r\n') def test_basic_commands(self): self.assertEqual(call('get key\r\n'), b'END\r\n') @@ -95,4 +102,19 @@ class TestUDPProtocol(unittest.TestCase): self.call_delete('key3') if __name__ == '__main__': - unittest.main() + parser = argparse.ArgumentParser(description="memcache protocol tests") + parser.add_argument('--server', '-s', action="store", help="server adddress in : format", default="localhost:11211") + parser.add_argument('--udp', '-U', action="store_true", help="Use UDP protocol") + args = parser.parse_args() + + host, port = args.server.split(':') + server_addr = (host, int(port)) + + if args.udp: + call = lambda msg: udp_call(server_addr, msg) + else: + call = lambda msg: tcp_call(server_addr, msg) + + runner = unittest.TextTestRunner() + itersuite = unittest.TestLoader().loadTestsFromTestCase(TestCommands) + runner.run(itersuite)