diff --git a/apps/memcache/ascii.rl b/apps/memcache/ascii.rl new file mode 100644 index 0000000000..2f5dc53793 --- /dev/null +++ b/apps/memcache/ascii.rl @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include "core/ragel.hh" +#include +#include +#include + +%%{ + +machine memcache_ascii_protocol; + +access _fsm_; + +action mark { + g.mark_start(p); +} + +action start_blob { + g.mark_start(p); + _size_left = _size; +} + +action advance_blob { + auto len = std::min((uint32_t)(pe - p), _size_left); + _size_left -= len; + p += len; + if (_size_left == 0) { + _blob = str(); + p--; + fret; + } + p--; +} + +crlf = '\r\n'; +sp = ' '; +u32 = digit+ >{ _u32 = 0; } ${ _u32 *= 10; _u32 += fc - '0'; }; +key = [^ ]+ >mark %{ _key = str(); }; +flags = u32 %{ _flags = _u32; }; +expiration = u32 %{ _expiration = _u32; }; +size = u32 %{ _size = _u32; }; +blob := any+ >start_blob $advance_blob; + +set = "set" sp key sp flags sp expiration sp size (crlf @{ fcall blob; } ) crlf @{ _state = state::cmd_set; }; +get = "get" (sp key %{ _keys.push_back(std::move(_key)); })+ crlf @{ _state = state::cmd_get; }; +delete = "delete" sp key crlf @{ _state = state::cmd_delete; }; + +main := set | get | delete; + +prepush { + prepush(); +} + +postpop { + postpop(); +} + +}%% + +class memcache_ascii_parser : public ragel_parser_base { + %% write data nofinal noprefix; +public: + enum class state { + error, + eof, + cmd_set, + cmd_get, + cmd_delete + }; + state _state; + uint32_t _u32; + sstring _key; + uint32_t _flags; + uint32_t _expiration; + uint32_t _size; + uint32_t _size_left; + sstring _blob; + std::vector _keys; +public: + void init() { + _state = state::error; + _keys.clear(); + %% write init; + } + + char* parse(char* p, char* pe, char* eof) { + sstring_builder::guard g(_builder, p, pe); + auto str = [this, &g, &p] { g.mark_end(p); return get_str(); }; + %% write exec; + if (_state != state::error) { + return p; + } + return nullptr; + } + bool eof() const { + return _state == state::eof; + } +}; diff --git a/apps/memcache/memcache.cc b/apps/memcache/memcache.cc new file mode 100644 index 0000000000..e029f2717e --- /dev/null +++ b/apps/memcache/memcache.cc @@ -0,0 +1,454 @@ +#include +#include +#include +#include "core/app-template.hh" +#include "core/async-action.hh" +#include "core/timer-set.hh" +#include "core/shared_ptr.hh" +#include "core/stream.hh" +#include "core/vector-data-sink.hh" +#include "net/api.hh" +#include "net/packet-data-source.hh" +#include "apps/memcache/ascii.hh" + +using namespace net; + +namespace bi = boost::intrusive; + +namespace memcache { + +template +using optional = boost::optional; + +using item_key = sstring; + +struct item_data { + sstring _data; + uint32_t _flag; + clock_type::time_point _expiry; +}; + +class item { +private: + item_data _data; + uint64_t _version; + bool _expired; + bi::list_member_hook<> _timer_link; + bi::list_member_hook<> _expired_link; + friend class cache; +public: + item(item_data data) + : _data(std::move(data)) + , _version(1) + , _expired(false) + { + } + + clock_type::time_point get_timeout() { + return _data._expiry; + } + + void update(item_data&& data) { + _data = std::move(data); + _version++; + } + + item_data& data() { + return _data; + } +}; + +struct cache_stats { + size_t _get_hits {}; + size_t _get_misses {}; + size_t _set_adds {}; + size_t _set_replaces {}; +}; + +class cache { +private: + using cache_type = std::unordered_map>; + using cache_iterator = typename cache_type::iterator; + cache_type _cache; + timer_set _alive; + bi::list, &item::_expired_link>> _expired; + timer _timer; + cache_stats _stats; +private: + void expire() { + _alive.expire(clock_type::now()); + while (auto item = _alive.pop_expired()) { + item->_expired = true; + _expired.push_back(*item); + } + _timer.arm(_alive.get_next_timeout()); + } + + inline + cache_iterator find(const item_key& key) { + auto i = _cache.find(key); + if (i != _cache.end()) { + auto& item_ref = *i->second; + if (item_ref._expired) { + _expired.erase(_expired.iterator_to(item_ref)); + _cache.erase(i); + return _cache.end(); + } + } + return i; + } + + inline + void add_overriding(cache_iterator i, item_data&& data) { + auto& item_ref = *i->second; + _alive.remove(item_ref); + item_ref.update(std::move(data)); + if (_alive.insert(item_ref)) { + _timer.rearm(item_ref.get_timeout()); + } + } + + inline + void add_new(item_key&& key, item_data&& data) { + auto r = _cache.emplace(std::move(key), make_shared(std::move(data))); + assert(r.second); + auto& item_ref = *r.first->second; + if (_alive.insert(item_ref)) { + _timer.rearm(item_ref.get_timeout()); + } + } +public: + cache() { + _timer.set_callback([this] { expire(); }); + } + + bool set(item_key&& key, item_data data) { + auto i = find(key); + if (i != _cache.end()) { + add_overriding(i, std::move(data)); + _stats._set_replaces++; + return true; + } else { + add_new(std::move(key), std::move(data)); + _stats._set_adds++; + return false; + } + } + + bool add(item_key&& key, item_data data) { + auto i = find(key); + if (i != _cache.end()) { + return false; + } + + add_new(std::move(key), std::move(data)); + return true; + } + + bool replace(const item_key& key, item_data data) { + auto i = find(key); + if (i == _cache.end()) { + return false; + } + + add_overriding(i, std::move(data)); + return true; + } + + bool remove(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + return false; + } + auto& item_ref = *i->second; + _alive.remove(item_ref); + _cache.erase(i); + return true; + } + + shared_ptr get(const item_key& key) { + auto i = find(key); + if (i == _cache.end()) { + _stats._get_misses++; + return {}; + } + _stats._get_hits++; + return i->second; + } + + size_t size() { + return _cache.size(); + } + + cache_stats& stats() { + return _stats; + } +}; + +class ascii_protocol { +private: + cache& _cache; + memcache_ascii_parser _parser; +private: + static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30; + static constexpr const char *msg_crlf = "\r\n"; + static constexpr const char *msg_error = "ERROR\r\n"; + static constexpr const char *msg_stored = "STORED\r\n"; + static constexpr const char *msg_end = "END\r\n"; + static constexpr const char *msg_value = "VALUE "; + static constexpr const char *msg_deleted = "DELETED\r\n"; + static constexpr const char *msg_not_found = "NOT_FOUND\r\n"; +public: + ascii_protocol(cache& cache) : _cache(cache) {} + + clock_type::time_point seconds_to_time_point(uint32_t seconds) { + if (seconds == 0) { + return clock_type::time_point::max(); + } else if (seconds <= seconds_in_a_month) { + return clock_type::now() + std::chrono::seconds(seconds); + } else { + return clock_type::time_point(std::chrono::seconds(seconds)); + } + } + + future<> handle(input_stream& in, output_stream& out) { + _parser.init(); + return in.consume(_parser).then([this, &out] () -> future<> { + switch (_parser._state) { + case memcache_ascii_parser::state::error: + case memcache_ascii_parser::state::eof: + return out.write(msg_error); + + case memcache_ascii_parser::state::cmd_set: + _cache.set(std::move(_parser._key), + item_data{std::move(_parser._blob), _parser._flags, seconds_to_time_point(_parser._expiration)}); + return out.write(msg_stored); + + case memcache_ascii_parser::state::cmd_get: + { + auto keys_p = make_shared>(std::move(_parser._keys)); + return do_for_each(keys_p->begin(), keys_p->end(), [this, &out, keys_p](auto&& key) mutable { + auto item = _cache.get(key); + if (!item) { + return make_ready_future<>(); + } + return out.write(msg_value) + .then([&out, &key] { + return out.write(key); + }).then([&out] { + return out.write(" "); + }).then([&out, item] { + return out.write(to_sstring(item->data()._flag)); + }).then([&out] { + return out.write(" "); + }).then([&out, item] { + return out.write(to_sstring(item->data()._data.size())); + }).then([&out] { + return out.write(msg_crlf); + }).then([&out, item] { + return out.write(item->data()._data); + }).then([&out] { + return out.write(msg_crlf); + }); + }).then([&out] { + return out.write(msg_end); + }); + } + + case memcache_ascii_parser::state::cmd_delete: + if (_cache.remove(_parser._key)) { + return out.write(msg_deleted); + } + return out.write(msg_not_found); + }; + return make_ready_future<>(); + }); + }; +}; + +void assert_resolved(future<> f) { + assert(f.available()); +} + +class udp_server { +public: + static const size_t default_max_datagram_size = 1400; +private: + ascii_protocol& _proto; + udp_channel _chan; + uint16_t _port; + size_t _max_datagram_size = default_max_datagram_size; + + struct header { + packed _request_id; + packed _sequence_number; + packed _n; + packed _reserved; + + template + auto adjust_endianness(Adjuster a) { + return a(_request_id, _sequence_number, _n); + } + } __attribute__((packed)); + +public: + udp_server(ascii_protocol& proto, uint16_t port = 11211) + : _proto(proto) + , _port(port) + {} + + void set_max_datagram_size(size_t max_datagram_size) { + _max_datagram_size = max_datagram_size; + } + + future<> respond(ipv4_addr dst, uint16_t request_id, std::vector>&& datagrams) { + if (datagrams.size() == 1) { + auto&& buf = datagrams[0]; + auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release()); + header *out_hdr = p.prepend_header
(); + out_hdr->_request_id = request_id; + out_hdr->_sequence_number = 0; + out_hdr->_n = 1; + hton(*out_hdr); + return _chan.send(dst, std::move(p)); + } + + int i = 0; + auto sb = make_shared(std::move(datagrams)); + return do_for_each(sb->begin(), sb->end(), + [this, i, sb, dst, request_id](auto&& buf) mutable { + auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release()); + header *out_hdr = p.prepend_header
(); + out_hdr->_request_id = request_id; + out_hdr->_sequence_number = i++; + out_hdr->_n = sb->size(); + hton(*out_hdr); + return _chan.send(dst, std::move(p)); + }); + } + + void start() { + _chan = engine.net().make_udp_channel({_port}); + keep_doing([this] { + return _chan.receive().then([this](udp_datagram dgram) { + packet& p = dgram.get_data(); + if (p.len() < sizeof(header)) { + // dropping invalid packet + return make_ready_future<>(); + } + + std::vector> out_bufs; + auto out = output_stream(data_sink(std::make_unique(out_bufs)), + _max_datagram_size - sizeof(header)); + + header *hdr = p.get_header
(); + ntoh(*hdr); + p.trim_front(sizeof(*hdr)); + + auto request_id = hdr->_request_id; + + if (hdr->_n != 1 || hdr->_sequence_number != 0) { + out.write("CLIENT_ERROR only single-datagram requests supported\r\n"); + } else { + auto in = as_input_stream(std::move(p)); + assert_resolved(_proto.handle(in, out)); + } + + assert_resolved(out.flush()); + return respond(dgram.get_src(), request_id, std::move(out_bufs)); + }); + }).or_terminate(); + }; +}; + +class tcp_server { +private: + shared_ptr _listener; + cache& _cache; + uint16_t _port; + struct connection { + connected_socket _socket; + socket_address _addr; + input_stream _in; + output_stream _out; + ascii_protocol _proto; + connection(connected_socket&& socket, socket_address addr, cache& c) + : _socket(std::move(socket)) + , _addr(addr) + , _in(_socket.input()) + , _out(_socket.output()) + , _proto(c) + {} + }; +public: + tcp_server(cache& cache, uint16_t port = 11211) : _cache(cache), _port(port) {} + void start() { + listen_options lo; + lo.reuse_address = true; + _listener = engine.listen(make_ipv4_address({_port}), lo); + keep_doing([this] { + return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable { + auto conn = make_shared(std::move(fd), addr, _cache); + keep_doing([this, conn] { + return conn->_proto.handle(conn->_in, conn->_out).then([conn] { + return conn->_out.flush(); + }); + }); + }); + }).or_terminate(); + } +}; + +class stats_printer { +private: + timer _timer; + cache& _cache; +public: + stats_printer(cache& cache) + : _cache(cache) {} + + void start() { + _timer.set_callback([this] { + auto stats = _cache.stats(); + auto gets_total = stats._get_hits + stats._get_misses; + auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0; + auto sets_total = stats._set_adds + stats._set_replaces; + auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0; + std::cout << "items: " << _cache.size() << " " + << std::setprecision(2) << std::fixed + << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) " + << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%) " + << std::endl; + }); + _timer.arm_periodic(std::chrono::seconds(1)); + } +}; + +} /* namespace memcache */ + +int main(int ac, char** av) +{ + memcache::cache cache; + memcache::ascii_protocol ascii_protocol(cache); + memcache::udp_server udp_server(ascii_protocol); + memcache::tcp_server tcp_server(cache); + memcache::stats_printer stats(cache); + + app_template app; + app.add_options() + ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), + "Maximum size of UDP datagram") + ("stats", + "Print basic statistics periodically (every second)") + ; + + return app.run(ac, av, [&] { + auto&& config = app.configuration(); + udp_server.set_max_datagram_size(config["max-datagram-size"].as()); + if (config.count("stats")) { + stats.start(); + } + udp_server.start(); + tcp_server.start(); + }); +} diff --git a/configure.py b/configure.py index 9d9a10fc7e..e482ff138e 100755 --- a/configure.py +++ b/configure.py @@ -12,11 +12,14 @@ tests = [ 'tests/udp_server', 'tests/udp_client', 'tests/blkdiscard_test', + 'tests/sstring_test', + 'tests/memcache/test_ascii_parser', ] apps = [ 'apps/httpd/httpd', 'apps/seastar/seastar', + 'apps/memcache/memcache', ] all_artifacts = apps + tests @@ -43,10 +46,16 @@ core = [ 'net/posix-stack.cc', ] +memcache = [ + 'apps/memcache/ascii.rl' +] + libnet + core + deps = { 'apps/seastar/seastar': ['apps/seastar/main.cc'] + core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, + 'apps/memcache/memcache': ['apps/memcache/memcache.cc'] + memcache, + 'tests/memcache/test_ascii_parser': ['tests/memcache/test_ascii_parser.cc'] + memcache, 'tests/fileiotest': ['tests/fileiotest.cc'] + core, 'tests/virtiotest': ['tests/virtiotest.cc'] + core + libnet, 'tests/l3_test': ['tests/l3_test.cc'] + core + libnet, @@ -57,6 +66,7 @@ deps = { 'tests/udp_server': ['tests/udp_server.cc'] + core + libnet, 'tests/udp_client': ['tests/udp_client.cc'] + core + libnet, 'tests/blkdiscard_test': ['tests/blkdiscard_test.cc'] + core, + 'tests/sstring_test': ['tests/sstring_test.cc'] + core, } modes = { @@ -74,7 +84,7 @@ modes = { }, } -libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm' +libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm -lboost_unit_test_framework' hwloc_libs = '-lhwloc -lnuma -lpciaccess -lxml2 -lz' warnings = [ @@ -139,9 +149,9 @@ def debug_flag(compiler): src_with_auto = textwrap.dedent('''\ template struct x { auto f() {} }; - + x a; - ''') + ''') if try_compile(source = src_with_auto, flags = ['-g', '-std=gnu++1y'], compiler = compiler): return '-g' else: diff --git a/core/app-template.hh b/core/app-template.hh index ac93d717af..e3395aa250 100644 --- a/core/app-template.hh +++ b/core/app-template.hh @@ -53,9 +53,9 @@ public: get_ex(); } catch (std::exception& ex) { std::cout << "program failed with uncaught exception: " << ex.what() << "\n"; - engine.exit(1); + engine.exit(1); } - }); + }); return engine.run(); }; }; diff --git a/core/async-action.hh b/core/async-action.hh index f9447c2b1d..3af904a152 100644 --- a/core/async-action.hh +++ b/core/async-action.hh @@ -45,19 +45,37 @@ future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) { return f; } -// Invoke given action undefinitely. Next invocation starts when previous completes or fails. +// Invoke given action until it fails. template static inline -void keep_doing(AsyncAction&& action) { +future<> keep_doing(AsyncAction&& action) { while (true) { auto f = action(); if (!f.available()) { - f.then([action = std::forward(action)] () mutable { - keep_doing(std::forward(action)); + return f.then([action = std::forward(action)] () mutable { + return keep_doing(std::forward(action)); }); - return; + } + + if (f.failed()) { + return std::move(f); } } } +template +static inline +future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) { + while (begin != end) { + auto f = action(*begin++); + if (!f.available()) { + return f.then([action = std::forward(action), + begin = std::move(begin), end = std::move(end)] () mutable { + return do_for_each(std::move(begin), std::move(end), std::forward(action)); + }); + } + } + return make_ready_future<>(); +} + #endif diff --git a/core/future.hh b/core/future.hh index 1b8fd60b2b..da489f2988 100644 --- a/core/future.hh +++ b/core/future.hh @@ -433,6 +433,16 @@ public: return f; } + future<> or_terminate() { + return rescue([] (auto get) { + try { + get(); + } catch (...) { + std::terminate(); + } + }); + } + template friend class promise; template diff --git a/core/reactor.hh b/core/reactor.hh index 911a130db8..414cb70d5d 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -117,6 +117,7 @@ public: future<> expired(); void set_callback(callback_t&& callback); void arm(clock_type::time_point until, boost::optional period = {}); + void rearm(clock_type::time_point until, boost::optional period = {}); void arm(clock_type::duration delta); void arm_periodic(clock_type::duration delta); bool armed() const { return _armed; } @@ -571,10 +572,24 @@ public: output_stream(data_sink fd, size_t size) : _fd(std::move(fd)), _buf(size), _size(size) {} future<> write(const char_type* buf, size_t n); + future<> write(const char_type* buf); + future<> write(const sstring& s); future<> flush(); private: }; +template +inline +future<> output_stream::write(const char_type* buf) { + return write(buf, strlen(buf)); +} + +template +inline +future<> output_stream::write(const sstring& s) { + return write(s.c_str(), s.size()); +} + class file_impl { public: virtual ~file_impl() {} @@ -736,7 +751,7 @@ inline future reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) { return writeable(fd).then([this, &fd, buffer, len] () mutable { - auto r = fd.fd.send(buffer, len, 0); + auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL); if (!r) { return write_some(fd, buffer, len); } @@ -982,6 +997,14 @@ void timer::arm(clock_type::time_point until, boost::optional period) { + if (_armed) { + cancel(); + } + arm(until, period); +} + inline void timer::arm(clock_type::duration delta) { return arm(clock_type::now() + delta); diff --git a/core/shared_ptr.hh b/core/shared_ptr.hh index 90a2294e09..44008916c1 100644 --- a/core/shared_ptr.hh +++ b/core/shared_ptr.hh @@ -71,6 +71,11 @@ public: } return *this; } + shared_ptr& operator=(T&& x) { + this->~shared_ptr(); + new (this) shared_ptr(new data(std::move(x))); + return *this; + } T& operator*() const { return _p->_value; } T* operator->() const { return &_p->_value; } diff --git a/core/sstring.hh b/core/sstring.hh index d371514b1d..ee897e2a96 100644 --- a/core/sstring.hh +++ b/core/sstring.hh @@ -243,4 +243,21 @@ string_type to_sstring(unsigned long long value, void* = nullptr) { return to_sstring_sprintf(value, "%llu"); } +template +inline +std::ostream& operator<<(std::ostream& os, const std::vector& v) { + bool first = true; + os << "{"; + for (auto&& elem : v) { + if (!first) { + os << ", "; + } else { + first = false; + } + os << elem; + } + os << "}"; + return os; +} + #endif /* SSTRING_HH_ */ diff --git a/core/vector-data-sink.hh b/core/vector-data-sink.hh new file mode 100644 index 0000000000..ec1a905be0 --- /dev/null +++ b/core/vector-data-sink.hh @@ -0,0 +1,30 @@ +/* + * Copyright 2014 Cloudius Systems + */ + +#ifndef VECTOR_DATA_SINK_HH_ +#define VECTOR_DATA_SINK_HH_ + +#include "core/reactor.hh" + +class vector_data_sink final : public data_sink_impl { +private: + using vector_type = std::vector>; + vector_type& _v; +public: + vector_data_sink(vector_type& v) : _v(v) {} + + virtual future<> put(std::vector> data) override { + for (auto&& buf : data) { + _v.push_back(std::move(buf)); + } + return make_ready_future<>(); + } + + virtual future<> put(temporary_buffer data) override { + _v.push_back(std::move(data)); + return make_ready_future<>(); + } +}; + +#endif diff --git a/net/packet-data-source.hh b/net/packet-data-source.hh new file mode 100644 index 0000000000..178607f989 --- /dev/null +++ b/net/packet-data-source.hh @@ -0,0 +1,35 @@ +#ifndef _PACKET_DATA_SOURCE_HH +#define _PACKET_DATA_SOURCE_HH + +#include "core/reactor.hh" +#include "net/packet.hh" + +namespace net { + +class packet_data_source final : public data_source_impl { + size_t _cur_frag = 0; + packet _p; +public: + explicit packet_data_source(net::packet&& p) + : _p(std::move(p)) + {} + + virtual future> get() override { + if (_cur_frag != _p.nr_frags()) { + auto& f = _p.fragments()[_cur_frag++]; + return make_ready_future>( + temporary_buffer(f.base, f.size, + make_deleter(deleter(), [p = _p.share()] () mutable {}))); + } + return make_ready_future>(temporary_buffer(0)); + } +}; + +static inline +input_stream as_input_stream(packet&& p) { + return input_stream(data_source(std::make_unique(std::move(p)))); +} + +} + +#endif diff --git a/net/packet.hh b/net/packet.hh index c76cae1471..a35601d352 100644 --- a/net/packet.hh +++ b/net/packet.hh @@ -385,7 +385,7 @@ packet::packet(fragment frag, Deleter d, packet&& x) _impl->_frags + _impl->_nr_frags + 1); ++_impl->_nr_frags; _impl->_frags[0] = frag; - _impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d)); + _impl->_deleter = make_deleter(std::move(_impl->_deleter), d); } template @@ -394,7 +394,7 @@ packet::packet(packet&& x, fragment frag, Deleter d) : _impl(impl::allocate_if_needed(std::move(x._impl), 1)) { _impl->_len += frag.size; _impl->_frags[_impl->_nr_frags++] = frag; - _impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d)); + _impl->_deleter = make_deleter(std::move(_impl->_deleter), d); } inline diff --git a/net/udp.hh b/net/udp.hh index 311c52cc38..4f9a1ba88b 100644 --- a/net/udp.hh +++ b/net/udp.hh @@ -6,11 +6,11 @@ #ifndef UDP_HH_ #define UDP_HH_ -#include -#include -#include #include #include +#include "net/ip.hh" +#include "core/reactor.hh" +#include "core/shared_ptr.hh" #include "net/api.hh" #include "const.hh" diff --git a/test.py b/test.py new file mode 100755 index 0000000000..2e2e7e0b44 --- /dev/null +++ b/test.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +import os +import sys +import subprocess + +all_tests = [ + 'futures_test', + 'memcache/test_ascii_parser', + 'sstring_test', +] + +last_len = 0 + +def print_status(msg): + global last_len + print('\r' + ' '*last_len, end='') + last_len = len(msg) + print('\r' + msg, end='') + +if __name__ == "__main__": + black_hole = open('/dev/null', 'w') + + test_to_run = [] + for mode in ['debug', 'release']: + for test in all_tests: + test_to_run.append(os.path.join('build', mode, 'tests', test)) + test_to_run.append('tests/memcache/test.py ' + os.path.join('build', mode, 'apps', 'memcache', 'memcache')) + + all_ok = True + + n_total = len(test_to_run) + for n, path in enumerate(test_to_run): + prefix = '[%d/%d]' % (n + 1, n_total) + print_status('%s RUNNING %s' % (prefix, path)) + if subprocess.call(path.split(' '), stdout=black_hole, stderr=black_hole): + print_status('FAILED: %s\n' % (path)) + all_ok = False + else: + print_status('%s PASSED %s' % (prefix, path)) + + if all_ok: + print('\nOK.') + else: + print_status('') + sys.exit(1) diff --git a/tests/futures_test.cc b/tests/futures_test.cc index 5beb3153e4..007515bcfb 100644 --- a/tests/futures_test.cc +++ b/tests/futures_test.cc @@ -2,16 +2,15 @@ * Copyright (C) 2014 Cloudius Systems, Ltd. */ -#include -#include +#include "core/app-template.hh" +#include "core/shared_ptr.hh" #include "test-utils.hh" -future<> test_finally_is_called_on_success_and_failure() { +SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure) { auto finally1 = make_shared(); auto finally2 = make_shared(); return make_ready_future().then([] { - OK(); }).finally([=] { *finally1 = true; }).then([] { @@ -19,66 +18,50 @@ future<> test_finally_is_called_on_success_and_failure() { }).finally([=] { *finally2 = true; }).rescue([=] (auto get) { - if (!*finally1) { - BUG(); - } - if (!*finally2) { - BUG(); - } + BOOST_REQUIRE(*finally1); + BOOST_REQUIRE(*finally2); // Should be failed. try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); } -future<> test_exception_from_finally_fails_the_target() { +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target) { promise<> pr; auto f = pr.get_future().finally([=] { - OK(); throw std::runtime_error(""); }).then([] { - BUG(); - }).rescue([] (auto get) { - OK(); - }); + BOOST_REQUIRE(false); + }).rescue([] (auto get) {}); pr.set_value(); return f; } -future<> test_exception_from_finally_fails_the_target_on_already_resolved() { +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target_on_already_resolved) { return make_ready_future().finally([=] { - OK(); throw std::runtime_error(""); }).then([] { - BUG(); - }).rescue([] (auto get) { - OK(); - }); + BOOST_REQUIRE(false); + }).rescue([] (auto get) {}); } -future<> test_exception_thrown_from_rescue_causes_future_to_fail() -{ +SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail) { return make_ready_future().rescue([] (auto get) { throw std::runtime_error(""); }).rescue([] (auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); } -future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() -{ +SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail__async_case) { promise<> p; auto f = p.get_future().rescue([] (auto get) { @@ -86,10 +69,8 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() }).rescue([] (auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } + BOOST_REQUIRE(false); + } catch (...) {} }); p.set_value(); @@ -97,15 +78,14 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case() return f; } -future<> test_failing_intermediate_promise_should_fail_the_master_future() { +SEASTAR_TEST_CASE(test_failing_intermediate_promise_should_fail_the_master_future) { promise<> p1; promise<> p2; auto f = p1.get_future().then([f = std::move(p2.get_future())] () mutable { - OK(); return std::move(f); }).then([] { - BUG(); + BOOST_REQUIRE(false); }); p1.set_value(); @@ -114,23 +94,7 @@ future<> test_failing_intermediate_promise_should_fail_the_master_future() { return f.rescue([](auto get) { try { get(); - BUG(); - } catch (...) { - OK(); - } - }); -} - -int main(int ac, char **av) -{ - return app_template().run(ac, av, [] { - run_tests( - test_finally_is_called_on_success_and_failure() - .then(test_exception_from_finally_fails_the_target) - .then(test_exception_from_finally_fails_the_target_on_already_resolved) - .then(test_exception_thrown_from_rescue_causes_future_to_fail) - .then(test_exception_thrown_from_rescue_causes_future_to_fail__async_case) - .then(test_failing_intermediate_promise_should_fail_the_master_future) - ); + BOOST_REQUIRE(false); + } catch (...) {} }); } diff --git a/tests/memcache/test.py b/tests/memcache/test.py new file mode 100755 index 0000000000..84fa77e95d --- /dev/null +++ b/tests/memcache/test.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +import time +import subprocess +import sys + +if len(sys.argv) < 2: + print('Usage: %s ' % sys.argv[0]) + +memcache_path = sys.argv[1] + +def run(cmd): + mc = subprocess.Popen([memcache_path]) + print('Memcache started.') + try: + time.sleep(0.1) + cmdline = ['tests/memcache/test_memcache.py'] + cmd + print('Running: ' + ' '.join(cmdline)) + subprocess.check_call(cmdline) + finally: + print('Killing memcache...') + mc.kill() + +run([]) +run(['-U']) diff --git a/tests/memcache/test_ascii_parser.cc b/tests/memcache/test_ascii_parser.cc new file mode 100644 index 0000000000..7288ad6db3 --- /dev/null +++ b/tests/memcache/test_ascii_parser.cc @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include +#include +#include "tests/test-utils.hh" +#include "core/shared_ptr.hh" +#include "net/packet-data-source.hh" +#include "apps/memcache/ascii.hh" + +using namespace net; + +using parser_type = memcache_ascii_parser; + +static packet make_packet(std::vector chunks) { + packet p; + for (auto&& chunk : chunks) { + size_t size = chunk.size(); + char* b = new char[size]; + memcpy(b, chunk.c_str(), size); + p = packet(std::move(p), fragment{b, size}, [b] { delete[] b; }); + } + return p; +} + +static auto make_input_stream(packet&& p) { + return input_stream(data_source( + std::make_unique(std::move(p)))); +} + +static auto parse(packet&& p) { + auto is = make_input_stream(std::move(p)); + auto parser = make_shared(); + parser->init(); + return is.consume(*parser).then([parser] { + return make_ready_future>(parser); + }); +} + +SEASTAR_TEST_CASE(test_set_command_is_parsed) { + return parse(make_packet({"set key 1 2 3\r\nabc\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_blob == "abc"); + }); +} + +SEASTAR_TEST_CASE(test_empty_data_is_parsed) { + return parse(make_packet({"set key 1 2 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 0); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_blob == ""); + }); +} + +SEASTAR_TEST_CASE(test_superflous_data_is_an_error) { + return parse(make_packet({"set key 0 0 0\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); +} + +SEASTAR_TEST_CASE(test_not_enough_data_is_an_error) { + return parse(make_packet({"set key 0 0 3\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); +} + +SEASTAR_TEST_CASE(test_u32_parsing) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"set key 0 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 0); + }); + }).then([] { + return parse(make_packet({"set key 12345 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == 12345); + }); + }).then([] { + return parse(make_packet({"set key -1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([] { + return parse(make_packet({"set key 1-1 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::error); + }); + }).then([] { + return parse(make_packet({"set key " + std::to_string(std::numeric_limits::max()) + " 0 0\r\n\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_flags == std::numeric_limits::max()); + }); + }); +} + +SEASTAR_TEST_CASE(test_parsing_of_split_data) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"set key 11", "1 222 3\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 11", "1 22", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set k", "ey 11", "1 2", "2", "2 3", "\r\nasd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\n", "asd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\na", "sd\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\nasd", "\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }).then([] { + return parse(make_packet({"set key 111 222 3\r\nasd\r", "\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key"); + BOOST_REQUIRE(p->_flags == 111); + BOOST_REQUIRE(p->_expiration == 222); + BOOST_REQUIRE(p->_size == 3); + BOOST_REQUIRE(p->_blob == "asd"); + }); + }); +} + +SEASTAR_TEST_CASE(test_get_parsing) { + return make_ready_future<>() + .then([] { + return parse(make_packet({"get key1\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1"})); + }); + }).then([] { + return parse(make_packet({"get key1 key2\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1", "key2"})); + }); + }).then([] { + return parse(make_packet({"get key1 key2 key3\r\n"})) + .then([] (auto p) { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_get); + BOOST_REQUIRE_EQUAL(p->_keys, std::vector({"key1", "key2", "key3"})); + }); + }); +} + +SEASTAR_TEST_CASE(test_multiple_requests_in_one_stream) { + auto p = make_shared(); + auto is = make_shared(make_input_stream(make_packet({"set key1 1 1 5\r\ndata1\r\nset key2 2 2 6\r\ndata2+\r\n"}))); + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key1"); + BOOST_REQUIRE(p->_flags == 1); + BOOST_REQUIRE(p->_expiration == 1); + BOOST_REQUIRE(p->_size == 5); + BOOST_REQUIRE(p->_blob == "data1"); + }).then([is, p] { + p->init(); + return is->consume(*p).then([p] { + BOOST_REQUIRE(p->_state == parser_type::state::cmd_set); + BOOST_REQUIRE(p->_key == "key2"); + BOOST_REQUIRE(p->_flags == 2); + BOOST_REQUIRE(p->_expiration == 2); + BOOST_REQUIRE(p->_size == 6); + BOOST_REQUIRE(p->_blob == "data2+"); + }); + }); +} diff --git a/tests/memcache/test_memcache.py b/tests/memcache/test_memcache.py new file mode 100755 index 0000000000..def7f842ad --- /dev/null +++ b/tests/memcache/test_memcache.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +import socket +import struct +import random +import argparse +import time +import unittest + +server_addr = None +call = None + +def tcp_call(server_addr, msg): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(server_addr) + s.send(msg.encode()) + data = s.recv(16*1024) + s.close() + return data + +def udp_call(server_addr, msg): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + this_req_id = random.randint(-32768, 32767) + + datagram = struct.pack(">hhhh", this_req_id, 0, 1, 0) + msg.encode() + sock.sendto(datagram, server_addr) + + messages = {} + n_determined = None + while True: + data, addr = sock.recvfrom(1500) + req_id, seq, n, res = struct.unpack_from(">hhhh", data) + content = data[8:] + + if n_determined and n_determined != n: + raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined, n)) + n_determined = n + + if req_id != this_req_id: + raise Exception('Invalid request id: ' + req_id + ', expected ' + this_req_id) + + if seq in messages: + raise Exception('Duplicate message for seq=' + seq) + + messages[seq] = content + if len(messages) == n: + break + + msg = b'' + for k, v in sorted(messages.items(), key=lambda e: e[0]): + msg += v + + sock.close() + return msg + +class TestCommands(unittest.TestCase): + def call_set(self, key, value, flags=0, expiry=0): + self.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n') + + def call_delete(self, key): + self.assertEqual(call('delete %s\r\n' % key), b'DELETED\r\n') + + def test_basic_commands(self): + self.assertEqual(call('get key\r\n'), b'END\r\n') + self.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + self.assertEqual(call('delete key\r\n'), b'DELETED\r\n') + self.assertEqual(call('delete key\r\n'), b'NOT_FOUND\r\n') + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_expiry(self): + self.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(1) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_expiry_at_epoch_time(self): + expiry = int(time.time()) + 1 + self.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry), b'STORED\r\n') + self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n') + time.sleep(2) + self.assertEqual(call('get key\r\n'), b'END\r\n') + + def test_mutliple_keys_in_get(self): + self.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b'STORED\r\n') + self.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b'STORED\r\n') + self.assertEqual(call('get key1 key\r\n'), b'VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n') + + def test_response_spanning_many_datagrams(self): + key1_data = '1' * 1000 + key2_data = '2' * 1000 + key3_data = '3' * 1000 + self.call_set('key1', key1_data) + self.call_set('key2', key2_data) + self.call_set('key3', key3_data) + self.assertEqual(call('get key1 key2 key3\r\n').decode(), + 'VALUE key1 0 %d\r\n%s\r\n' \ + 'VALUE key2 0 %d\r\n%s\r\n' \ + 'VALUE key3 0 %d\r\n%s\r\n' \ + 'END\r\n' % (len(key1_data), key1_data, len(key2_data), key2_data, len(key3_data), key3_data)) + self.call_delete('key1') + self.call_delete('key2') + self.call_delete('key3') + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description="memcache protocol tests") + parser.add_argument('--server', '-s', action="store", help="server adddress in : format", default="localhost:11211") + parser.add_argument('--udp', '-U', action="store_true", help="Use UDP protocol") + args = parser.parse_args() + + host, port = args.server.split(':') + server_addr = (host, int(port)) + + if args.udp: + call = lambda msg: udp_call(server_addr, msg) + else: + call = lambda msg: tcp_call(server_addr, msg) + + runner = unittest.TextTestRunner() + itersuite = unittest.TestLoader().loadTestsFromTestCase(TestCommands) + runner.run(itersuite) diff --git a/tests/sstring_test.cc b/tests/sstring_test.cc new file mode 100644 index 0000000000..9c5e60fcdd --- /dev/null +++ b/tests/sstring_test.cc @@ -0,0 +1,17 @@ +/* + * Copyright 2014 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "core/sstring.hh" + +BOOST_AUTO_TEST_CASE(test_equality) { + BOOST_REQUIRE_EQUAL(sstring("aaa"), sstring("aaa")); +} + +BOOST_AUTO_TEST_CASE(test_to_sstring) { + BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567")); +} diff --git a/tests/test-utils.hh b/tests/test-utils.hh index 555b3b7510..951adc9f76 100644 --- a/tests/test-utils.hh +++ b/tests/test-utils.hh @@ -6,27 +6,58 @@ #define _TEST_UTILS_HH #include -#include +#include +#include "core/future.hh" +#include "core/reactor.hh" +#include "core/app-template.hh" -#define BUG() do { \ - std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \ - throw std::runtime_error("test failed"); \ - } while (0) +using namespace boost::unit_test; -#define OK() { \ - std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \ - } while (0) +class seastar_test { +public: + seastar_test(); + virtual ~seastar_test() {} + virtual const char* get_name() = 0; + virtual future<> run_test_case() = 0; -static inline -void run_tests(future<> tests_done) { - tests_done.rescue([] (auto get) { - try { - get(); - exit(0); - } catch(...) { - std::terminate(); - } - }); + void run() { + posix_thread t([this] { + engine.when_started().then([this] { + return run_test_case(); + }).rescue([] (auto get) { + try { + get(); + engine.exit(0); + } catch (...) { + std::terminate(); + } + }); + engine.run(); + }); + t.join(); + } +}; + +static std::vector tests; + +seastar_test::seastar_test() { + tests.push_back(this); } +test_suite* init_unit_test_suite(int argc, char* argv[]) { + test_suite* ts = BOOST_TEST_SUITE("seastar-tests"); + for (seastar_test* test : tests) { + ts->add(boost::unit_test::make_test_case([test] { test->run(); }, test->get_name())); + } + return ts; +} + +#define SEASTAR_TEST_CASE(name) \ + struct name : public seastar_test { \ + const char* get_name() override { return #name; } \ + future<> run_test_case() override; \ + }; \ + static name name ## _instance; \ + future<> name::run_test_case() + #endif diff --git a/tests/timertest.cc b/tests/timertest.cc index 93aa584cf6..aca5808077 100644 --- a/tests/timertest.cc +++ b/tests/timertest.cc @@ -4,11 +4,19 @@ #include "core/reactor.hh" #include "core/print.hh" -#include "test-utils.hh" #include using namespace std::chrono_literals; +#define BUG() do { \ + std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \ + throw std::runtime_error("test failed"); \ + } while (0) + +#define OK() { \ + std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \ + } while (0) + struct timer_test { timer t1; timer t2;