Merge branch 'memcache' of github.com:cloudius-systems/seastar-dev

Initial version of memcached on seastar from Tomasz:

"Supports subset of ASCII protocol, commands: get, set and delete.
Both UDP and TCP are supported."
This commit is contained in:
Avi Kivity
2014-10-15 19:20:39 +03:00
21 changed files with 1237 additions and 94 deletions

100
apps/memcache/ascii.rl Normal file
View File

@@ -0,0 +1,100 @@
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include "core/ragel.hh"
#include <memory>
#include <algorithm>
#include <functional>
%%{
machine memcache_ascii_protocol;
access _fsm_;
action mark {
g.mark_start(p);
}
action start_blob {
g.mark_start(p);
_size_left = _size;
}
action advance_blob {
auto len = std::min((uint32_t)(pe - p), _size_left);
_size_left -= len;
p += len;
if (_size_left == 0) {
_blob = str();
p--;
fret;
}
p--;
}
crlf = '\r\n';
sp = ' ';
u32 = digit+ >{ _u32 = 0; } ${ _u32 *= 10; _u32 += fc - '0'; };
key = [^ ]+ >mark %{ _key = str(); };
flags = u32 %{ _flags = _u32; };
expiration = u32 %{ _expiration = _u32; };
size = u32 %{ _size = _u32; };
blob := any+ >start_blob $advance_blob;
set = "set" sp key sp flags sp expiration sp size (crlf @{ fcall blob; } ) crlf @{ _state = state::cmd_set; };
get = "get" (sp key %{ _keys.push_back(std::move(_key)); })+ crlf @{ _state = state::cmd_get; };
delete = "delete" sp key crlf @{ _state = state::cmd_delete; };
main := set | get | delete;
prepush {
prepush();
}
postpop {
postpop();
}
}%%
class memcache_ascii_parser : public ragel_parser_base<memcache_ascii_parser> {
%% write data nofinal noprefix;
public:
enum class state {
error,
eof,
cmd_set,
cmd_get,
cmd_delete
};
state _state;
uint32_t _u32;
sstring _key;
uint32_t _flags;
uint32_t _expiration;
uint32_t _size;
uint32_t _size_left;
sstring _blob;
std::vector<sstring> _keys;
public:
void init() {
_state = state::error;
_keys.clear();
%% write init;
}
char* parse(char* p, char* pe, char* eof) {
sstring_builder::guard g(_builder, p, pe);
auto str = [this, &g, &p] { g.mark_end(p); return get_str(); };
%% write exec;
if (_state != state::error) {
return p;
}
return nullptr;
}
bool eof() const {
return _state == state::eof;
}
};

454
apps/memcache/memcache.cc Normal file
View File

@@ -0,0 +1,454 @@
#include <boost/intrusive/list.hpp>
#include <boost/optional.hpp>
#include <iomanip>
#include "core/app-template.hh"
#include "core/async-action.hh"
#include "core/timer-set.hh"
#include "core/shared_ptr.hh"
#include "core/stream.hh"
#include "core/vector-data-sink.hh"
#include "net/api.hh"
#include "net/packet-data-source.hh"
#include "apps/memcache/ascii.hh"
using namespace net;
namespace bi = boost::intrusive;
namespace memcache {
template<typename T>
using optional = boost::optional<T>;
using item_key = sstring;
struct item_data {
sstring _data;
uint32_t _flag;
clock_type::time_point _expiry;
};
class item {
private:
item_data _data;
uint64_t _version;
bool _expired;
bi::list_member_hook<> _timer_link;
bi::list_member_hook<> _expired_link;
friend class cache;
public:
item(item_data data)
: _data(std::move(data))
, _version(1)
, _expired(false)
{
}
clock_type::time_point get_timeout() {
return _data._expiry;
}
void update(item_data&& data) {
_data = std::move(data);
_version++;
}
item_data& data() {
return _data;
}
};
struct cache_stats {
size_t _get_hits {};
size_t _get_misses {};
size_t _set_adds {};
size_t _set_replaces {};
};
class cache {
private:
using cache_type = std::unordered_map<item_key, shared_ptr<item>>;
using cache_iterator = typename cache_type::iterator;
cache_type _cache;
timer_set<item, &item::_timer_link, clock_type> _alive;
bi::list<item, bi::member_hook<item, bi::list_member_hook<>, &item::_expired_link>> _expired;
timer _timer;
cache_stats _stats;
private:
void expire() {
_alive.expire(clock_type::now());
while (auto item = _alive.pop_expired()) {
item->_expired = true;
_expired.push_back(*item);
}
_timer.arm(_alive.get_next_timeout());
}
inline
cache_iterator find(const item_key& key) {
auto i = _cache.find(key);
if (i != _cache.end()) {
auto& item_ref = *i->second;
if (item_ref._expired) {
_expired.erase(_expired.iterator_to(item_ref));
_cache.erase(i);
return _cache.end();
}
}
return i;
}
inline
void add_overriding(cache_iterator i, item_data&& data) {
auto& item_ref = *i->second;
_alive.remove(item_ref);
item_ref.update(std::move(data));
if (_alive.insert(item_ref)) {
_timer.rearm(item_ref.get_timeout());
}
}
inline
void add_new(item_key&& key, item_data&& data) {
auto r = _cache.emplace(std::move(key), make_shared<item>(std::move(data)));
assert(r.second);
auto& item_ref = *r.first->second;
if (_alive.insert(item_ref)) {
_timer.rearm(item_ref.get_timeout());
}
}
public:
cache() {
_timer.set_callback([this] { expire(); });
}
bool set(item_key&& key, item_data data) {
auto i = find(key);
if (i != _cache.end()) {
add_overriding(i, std::move(data));
_stats._set_replaces++;
return true;
} else {
add_new(std::move(key), std::move(data));
_stats._set_adds++;
return false;
}
}
bool add(item_key&& key, item_data data) {
auto i = find(key);
if (i != _cache.end()) {
return false;
}
add_new(std::move(key), std::move(data));
return true;
}
bool replace(const item_key& key, item_data data) {
auto i = find(key);
if (i == _cache.end()) {
return false;
}
add_overriding(i, std::move(data));
return true;
}
bool remove(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
return false;
}
auto& item_ref = *i->second;
_alive.remove(item_ref);
_cache.erase(i);
return true;
}
shared_ptr<item> get(const item_key& key) {
auto i = find(key);
if (i == _cache.end()) {
_stats._get_misses++;
return {};
}
_stats._get_hits++;
return i->second;
}
size_t size() {
return _cache.size();
}
cache_stats& stats() {
return _stats;
}
};
class ascii_protocol {
private:
cache& _cache;
memcache_ascii_parser _parser;
private:
static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30;
static constexpr const char *msg_crlf = "\r\n";
static constexpr const char *msg_error = "ERROR\r\n";
static constexpr const char *msg_stored = "STORED\r\n";
static constexpr const char *msg_end = "END\r\n";
static constexpr const char *msg_value = "VALUE ";
static constexpr const char *msg_deleted = "DELETED\r\n";
static constexpr const char *msg_not_found = "NOT_FOUND\r\n";
public:
ascii_protocol(cache& cache) : _cache(cache) {}
clock_type::time_point seconds_to_time_point(uint32_t seconds) {
if (seconds == 0) {
return clock_type::time_point::max();
} else if (seconds <= seconds_in_a_month) {
return clock_type::now() + std::chrono::seconds(seconds);
} else {
return clock_type::time_point(std::chrono::seconds(seconds));
}
}
future<> handle(input_stream<char>& in, output_stream<char>& out) {
_parser.init();
return in.consume(_parser).then([this, &out] () -> future<> {
switch (_parser._state) {
case memcache_ascii_parser::state::error:
case memcache_ascii_parser::state::eof:
return out.write(msg_error);
case memcache_ascii_parser::state::cmd_set:
_cache.set(std::move(_parser._key),
item_data{std::move(_parser._blob), _parser._flags, seconds_to_time_point(_parser._expiration)});
return out.write(msg_stored);
case memcache_ascii_parser::state::cmd_get:
{
auto keys_p = make_shared<std::vector<sstring>>(std::move(_parser._keys));
return do_for_each(keys_p->begin(), keys_p->end(), [this, &out, keys_p](auto&& key) mutable {
auto item = _cache.get(key);
if (!item) {
return make_ready_future<>();
}
return out.write(msg_value)
.then([&out, &key] {
return out.write(key);
}).then([&out] {
return out.write(" ");
}).then([&out, item] {
return out.write(to_sstring(item->data()._flag));
}).then([&out] {
return out.write(" ");
}).then([&out, item] {
return out.write(to_sstring(item->data()._data.size()));
}).then([&out] {
return out.write(msg_crlf);
}).then([&out, item] {
return out.write(item->data()._data);
}).then([&out] {
return out.write(msg_crlf);
});
}).then([&out] {
return out.write(msg_end);
});
}
case memcache_ascii_parser::state::cmd_delete:
if (_cache.remove(_parser._key)) {
return out.write(msg_deleted);
}
return out.write(msg_not_found);
};
return make_ready_future<>();
});
};
};
void assert_resolved(future<> f) {
assert(f.available());
}
class udp_server {
public:
static const size_t default_max_datagram_size = 1400;
private:
ascii_protocol& _proto;
udp_channel _chan;
uint16_t _port;
size_t _max_datagram_size = default_max_datagram_size;
struct header {
packed<uint16_t> _request_id;
packed<uint16_t> _sequence_number;
packed<uint16_t> _n;
packed<uint16_t> _reserved;
template<typename Adjuster>
auto adjust_endianness(Adjuster a) {
return a(_request_id, _sequence_number, _n);
}
} __attribute__((packed));
public:
udp_server(ascii_protocol& proto, uint16_t port = 11211)
: _proto(proto)
, _port(port)
{}
void set_max_datagram_size(size_t max_datagram_size) {
_max_datagram_size = max_datagram_size;
}
future<> respond(ipv4_addr dst, uint16_t request_id, std::vector<temporary_buffer<char>>&& datagrams) {
if (datagrams.size() == 1) {
auto&& buf = datagrams[0];
auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release());
header *out_hdr = p.prepend_header<header>();
out_hdr->_request_id = request_id;
out_hdr->_sequence_number = 0;
out_hdr->_n = 1;
hton(*out_hdr);
return _chan.send(dst, std::move(p));
}
int i = 0;
auto sb = make_shared(std::move(datagrams));
return do_for_each(sb->begin(), sb->end(),
[this, i, sb, dst, request_id](auto&& buf) mutable {
auto p = packet(fragment{buf.get_write(), buf.size()}, buf.release());
header *out_hdr = p.prepend_header<header>();
out_hdr->_request_id = request_id;
out_hdr->_sequence_number = i++;
out_hdr->_n = sb->size();
hton(*out_hdr);
return _chan.send(dst, std::move(p));
});
}
void start() {
_chan = engine.net().make_udp_channel({_port});
keep_doing([this] {
return _chan.receive().then([this](udp_datagram dgram) {
packet& p = dgram.get_data();
if (p.len() < sizeof(header)) {
// dropping invalid packet
return make_ready_future<>();
}
std::vector<temporary_buffer<char>> out_bufs;
auto out = output_stream<char>(data_sink(std::make_unique<vector_data_sink>(out_bufs)),
_max_datagram_size - sizeof(header));
header *hdr = p.get_header<header>();
ntoh(*hdr);
p.trim_front(sizeof(*hdr));
auto request_id = hdr->_request_id;
if (hdr->_n != 1 || hdr->_sequence_number != 0) {
out.write("CLIENT_ERROR only single-datagram requests supported\r\n");
} else {
auto in = as_input_stream(std::move(p));
assert_resolved(_proto.handle(in, out));
}
assert_resolved(out.flush());
return respond(dgram.get_src(), request_id, std::move(out_bufs));
});
}).or_terminate();
};
};
class tcp_server {
private:
shared_ptr<server_socket> _listener;
cache& _cache;
uint16_t _port;
struct connection {
connected_socket _socket;
socket_address _addr;
input_stream<char> _in;
output_stream<char> _out;
ascii_protocol _proto;
connection(connected_socket&& socket, socket_address addr, cache& c)
: _socket(std::move(socket))
, _addr(addr)
, _in(_socket.input())
, _out(_socket.output())
, _proto(c)
{}
};
public:
tcp_server(cache& cache, uint16_t port = 11211) : _cache(cache), _port(port) {}
void start() {
listen_options lo;
lo.reuse_address = true;
_listener = engine.listen(make_ipv4_address({_port}), lo);
keep_doing([this] {
return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable {
auto conn = make_shared<connection>(std::move(fd), addr, _cache);
keep_doing([this, conn] {
return conn->_proto.handle(conn->_in, conn->_out).then([conn] {
return conn->_out.flush();
});
});
});
}).or_terminate();
}
};
class stats_printer {
private:
timer _timer;
cache& _cache;
public:
stats_printer(cache& cache)
: _cache(cache) {}
void start() {
_timer.set_callback([this] {
auto stats = _cache.stats();
auto gets_total = stats._get_hits + stats._get_misses;
auto get_hit_rate = gets_total ? ((double)stats._get_hits * 100 / gets_total) : 0;
auto sets_total = stats._set_adds + stats._set_replaces;
auto set_replace_rate = sets_total ? ((double)stats._set_replaces * 100/ sets_total) : 0;
std::cout << "items: " << _cache.size() << " "
<< std::setprecision(2) << std::fixed
<< "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) "
<< "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%) "
<< std::endl;
});
_timer.arm_periodic(std::chrono::seconds(1));
}
};
} /* namespace memcache */
int main(int ac, char** av)
{
memcache::cache cache;
memcache::ascii_protocol ascii_protocol(cache);
memcache::udp_server udp_server(ascii_protocol);
memcache::tcp_server tcp_server(cache);
memcache::stats_printer stats(cache);
app_template app;
app.add_options()
("max-datagram-size", bpo::value<int>()->default_value(memcache::udp_server::default_max_datagram_size),
"Maximum size of UDP datagram")
("stats",
"Print basic statistics periodically (every second)")
;
return app.run(ac, av, [&] {
auto&& config = app.configuration();
udp_server.set_max_datagram_size(config["max-datagram-size"].as<int>());
if (config.count("stats")) {
stats.start();
}
udp_server.start();
tcp_server.start();
});
}

View File

@@ -12,11 +12,14 @@ tests = [
'tests/udp_server',
'tests/udp_client',
'tests/blkdiscard_test',
'tests/sstring_test',
'tests/memcache/test_ascii_parser',
]
apps = [
'apps/httpd/httpd',
'apps/seastar/seastar',
'apps/memcache/memcache',
]
all_artifacts = apps + tests
@@ -43,10 +46,16 @@ core = [
'net/posix-stack.cc',
]
memcache = [
'apps/memcache/ascii.rl'
] + libnet + core
deps = {
'apps/seastar/seastar': ['apps/seastar/main.cc'] + core,
'tests/test-reactor': ['tests/test-reactor.cc'] + core,
'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core,
'apps/memcache/memcache': ['apps/memcache/memcache.cc'] + memcache,
'tests/memcache/test_ascii_parser': ['tests/memcache/test_ascii_parser.cc'] + memcache,
'tests/fileiotest': ['tests/fileiotest.cc'] + core,
'tests/virtiotest': ['tests/virtiotest.cc'] + core + libnet,
'tests/l3_test': ['tests/l3_test.cc'] + core + libnet,
@@ -57,6 +66,7 @@ deps = {
'tests/udp_server': ['tests/udp_server.cc'] + core + libnet,
'tests/udp_client': ['tests/udp_client.cc'] + core + libnet,
'tests/blkdiscard_test': ['tests/blkdiscard_test.cc'] + core,
'tests/sstring_test': ['tests/sstring_test.cc'] + core,
}
modes = {
@@ -74,7 +84,7 @@ modes = {
},
}
libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm'
libs = '-laio -lboost_program_options -lboost_system -lstdc++ -lm -lboost_unit_test_framework'
hwloc_libs = '-lhwloc -lnuma -lpciaccess -lxml2 -lz'
warnings = [
@@ -139,9 +149,9 @@ def debug_flag(compiler):
src_with_auto = textwrap.dedent('''\
template <typename T>
struct x { auto f() {} };
x<int> a;
''')
''')
if try_compile(source = src_with_auto, flags = ['-g', '-std=gnu++1y'], compiler = compiler):
return '-g'
else:

View File

@@ -53,9 +53,9 @@ public:
get_ex();
} catch (std::exception& ex) {
std::cout << "program failed with uncaught exception: " << ex.what() << "\n";
engine.exit(1);
engine.exit(1);
}
});
});
return engine.run();
};
};

View File

@@ -45,19 +45,37 @@ future<> do_until(StopCondition&& stop_cond, AsyncAction&& action) {
return f;
}
// Invoke given action undefinitely. Next invocation starts when previous completes or fails.
// Invoke given action until it fails.
template<typename AsyncAction>
static inline
void keep_doing(AsyncAction&& action) {
future<> keep_doing(AsyncAction&& action) {
while (true) {
auto f = action();
if (!f.available()) {
f.then([action = std::forward<AsyncAction>(action)] () mutable {
keep_doing(std::forward<AsyncAction>(action));
return f.then([action = std::forward<AsyncAction>(action)] () mutable {
return keep_doing(std::forward<AsyncAction>(action));
});
return;
}
if (f.failed()) {
return std::move(f);
}
}
}
template<typename Iterator, typename AsyncAction>
static inline
future<> do_for_each(Iterator begin, Iterator end, AsyncAction&& action) {
while (begin != end) {
auto f = action(*begin++);
if (!f.available()) {
return f.then([action = std::forward<AsyncAction>(action),
begin = std::move(begin), end = std::move(end)] () mutable {
return do_for_each(std::move(begin), std::move(end), std::forward<AsyncAction>(action));
});
}
}
return make_ready_future<>();
}
#endif

View File

@@ -433,6 +433,16 @@ public:
return f;
}
future<> or_terminate() {
return rescue([] (auto get) {
try {
get();
} catch (...) {
std::terminate();
}
});
}
template <typename... U>
friend class promise;
template <typename... U, typename... A>

View File

@@ -117,6 +117,7 @@ public:
future<> expired();
void set_callback(callback_t&& callback);
void arm(clock_type::time_point until, boost::optional<clock_type::duration> period = {});
void rearm(clock_type::time_point until, boost::optional<clock_type::duration> period = {});
void arm(clock_type::duration delta);
void arm_periodic(clock_type::duration delta);
bool armed() const { return _armed; }
@@ -571,10 +572,24 @@ public:
output_stream(data_sink fd, size_t size)
: _fd(std::move(fd)), _buf(size), _size(size) {}
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);
future<> flush();
private:
};
template<typename CharType>
inline
future<> output_stream<CharType>::write(const char_type* buf) {
return write(buf, strlen(buf));
}
template<typename CharType>
inline
future<> output_stream<CharType>::write(const sstring& s) {
return write(s.c_str(), s.size());
}
class file_impl {
public:
virtual ~file_impl() {}
@@ -736,7 +751,7 @@ inline
future<size_t>
reactor::write_some(pollable_fd_state& fd, const void* buffer, size_t len) {
return writeable(fd).then([this, &fd, buffer, len] () mutable {
auto r = fd.fd.send(buffer, len, 0);
auto r = fd.fd.send(buffer, len, MSG_NOSIGNAL);
if (!r) {
return write_some(fd, buffer, len);
}
@@ -982,6 +997,14 @@ void timer::arm(clock_type::time_point until, boost::optional<clock_type::durati
_queued = true;
}
inline
void timer::rearm(clock_type::time_point until, boost::optional<clock_type::duration> period) {
if (_armed) {
cancel();
}
arm(until, period);
}
inline
void timer::arm(clock_type::duration delta) {
return arm(clock_type::now() + delta);

View File

@@ -71,6 +71,11 @@ public:
}
return *this;
}
shared_ptr& operator=(T&& x) {
this->~shared_ptr();
new (this) shared_ptr(new data(std::move(x)));
return *this;
}
T& operator*() const { return _p->_value; }
T* operator->() const { return &_p->_value; }

View File

@@ -243,4 +243,21 @@ string_type to_sstring(unsigned long long value, void* = nullptr) {
return to_sstring_sprintf(value, "%llu");
}
template <typename T>
inline
std::ostream& operator<<(std::ostream& os, const std::vector<T>& v) {
bool first = true;
os << "{";
for (auto&& elem : v) {
if (!first) {
os << ", ";
} else {
first = false;
}
os << elem;
}
os << "}";
return os;
}
#endif /* SSTRING_HH_ */

30
core/vector-data-sink.hh Normal file
View File

@@ -0,0 +1,30 @@
/*
* Copyright 2014 Cloudius Systems
*/
#ifndef VECTOR_DATA_SINK_HH_
#define VECTOR_DATA_SINK_HH_
#include "core/reactor.hh"
class vector_data_sink final : public data_sink_impl {
private:
using vector_type = std::vector<temporary_buffer<char>>;
vector_type& _v;
public:
vector_data_sink(vector_type& v) : _v(v) {}
virtual future<> put(std::vector<temporary_buffer<char>> data) override {
for (auto&& buf : data) {
_v.push_back(std::move(buf));
}
return make_ready_future<>();
}
virtual future<> put(temporary_buffer<char> data) override {
_v.push_back(std::move(data));
return make_ready_future<>();
}
};
#endif

35
net/packet-data-source.hh Normal file
View File

@@ -0,0 +1,35 @@
#ifndef _PACKET_DATA_SOURCE_HH
#define _PACKET_DATA_SOURCE_HH
#include "core/reactor.hh"
#include "net/packet.hh"
namespace net {
class packet_data_source final : public data_source_impl {
size_t _cur_frag = 0;
packet _p;
public:
explicit packet_data_source(net::packet&& p)
: _p(std::move(p))
{}
virtual future<temporary_buffer<char>> get() override {
if (_cur_frag != _p.nr_frags()) {
auto& f = _p.fragments()[_cur_frag++];
return make_ready_future<temporary_buffer<char>>(
temporary_buffer<char>(f.base, f.size,
make_deleter(deleter(), [p = _p.share()] () mutable {})));
}
return make_ready_future<temporary_buffer<char>>(temporary_buffer<char>(0));
}
};
static inline
input_stream<char> as_input_stream(packet&& p) {
return input_stream<char>(data_source(std::make_unique<packet_data_source>(std::move(p))));
}
}
#endif

View File

@@ -385,7 +385,7 @@ packet::packet(fragment frag, Deleter d, packet&& x)
_impl->_frags + _impl->_nr_frags + 1);
++_impl->_nr_frags;
_impl->_frags[0] = frag;
_impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d));
_impl->_deleter = make_deleter(std::move(_impl->_deleter), d);
}
template <typename Deleter>
@@ -394,7 +394,7 @@ packet::packet(packet&& x, fragment frag, Deleter d)
: _impl(impl::allocate_if_needed(std::move(x._impl), 1)) {
_impl->_len += frag.size;
_impl->_frags[_impl->_nr_frags++] = frag;
_impl->_deleter.reset(make_deleter(std::move(_impl->_deleter), d));
_impl->_deleter = make_deleter(std::move(_impl->_deleter), d);
}
inline

View File

@@ -6,11 +6,11 @@
#ifndef UDP_HH_
#define UDP_HH_
#include <net/ip.hh>
#include <core/reactor.hh>
#include <core/shared_ptr.hh>
#include <unordered_map>
#include <assert.h>
#include "net/ip.hh"
#include "core/reactor.hh"
#include "core/shared_ptr.hh"
#include "net/api.hh"
#include "const.hh"

45
test.py Executable file
View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
import os
import sys
import subprocess
all_tests = [
'futures_test',
'memcache/test_ascii_parser',
'sstring_test',
]
last_len = 0
def print_status(msg):
global last_len
print('\r' + ' '*last_len, end='')
last_len = len(msg)
print('\r' + msg, end='')
if __name__ == "__main__":
black_hole = open('/dev/null', 'w')
test_to_run = []
for mode in ['debug', 'release']:
for test in all_tests:
test_to_run.append(os.path.join('build', mode, 'tests', test))
test_to_run.append('tests/memcache/test.py ' + os.path.join('build', mode, 'apps', 'memcache', 'memcache'))
all_ok = True
n_total = len(test_to_run)
for n, path in enumerate(test_to_run):
prefix = '[%d/%d]' % (n + 1, n_total)
print_status('%s RUNNING %s' % (prefix, path))
if subprocess.call(path.split(' '), stdout=black_hole, stderr=black_hole):
print_status('FAILED: %s\n' % (path))
all_ok = False
else:
print_status('%s PASSED %s' % (prefix, path))
if all_ok:
print('\nOK.')
else:
print_status('')
sys.exit(1)

View File

@@ -2,16 +2,15 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include <core/app-template.hh>
#include <core/shared_ptr.hh>
#include "core/app-template.hh"
#include "core/shared_ptr.hh"
#include "test-utils.hh"
future<> test_finally_is_called_on_success_and_failure() {
SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure) {
auto finally1 = make_shared<bool>();
auto finally2 = make_shared<bool>();
return make_ready_future().then([] {
OK();
}).finally([=] {
*finally1 = true;
}).then([] {
@@ -19,66 +18,50 @@ future<> test_finally_is_called_on_success_and_failure() {
}).finally([=] {
*finally2 = true;
}).rescue([=] (auto get) {
if (!*finally1) {
BUG();
}
if (!*finally2) {
BUG();
}
BOOST_REQUIRE(*finally1);
BOOST_REQUIRE(*finally2);
// Should be failed.
try {
get();
BUG();
} catch (...) {
OK();
}
BOOST_REQUIRE(false);
} catch (...) {}
});
}
future<> test_exception_from_finally_fails_the_target() {
SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target) {
promise<> pr;
auto f = pr.get_future().finally([=] {
OK();
throw std::runtime_error("");
}).then([] {
BUG();
}).rescue([] (auto get) {
OK();
});
BOOST_REQUIRE(false);
}).rescue([] (auto get) {});
pr.set_value();
return f;
}
future<> test_exception_from_finally_fails_the_target_on_already_resolved() {
SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target_on_already_resolved) {
return make_ready_future().finally([=] {
OK();
throw std::runtime_error("");
}).then([] {
BUG();
}).rescue([] (auto get) {
OK();
});
BOOST_REQUIRE(false);
}).rescue([] (auto get) {});
}
future<> test_exception_thrown_from_rescue_causes_future_to_fail()
{
SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail) {
return make_ready_future().rescue([] (auto get) {
throw std::runtime_error("");
}).rescue([] (auto get) {
try {
get();
BUG();
} catch (...) {
OK();
}
BOOST_REQUIRE(false);
} catch (...) {}
});
}
future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case()
{
SEASTAR_TEST_CASE(test_exception_thrown_from_rescue_causes_future_to_fail__async_case) {
promise<> p;
auto f = p.get_future().rescue([] (auto get) {
@@ -86,10 +69,8 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case()
}).rescue([] (auto get) {
try {
get();
BUG();
} catch (...) {
OK();
}
BOOST_REQUIRE(false);
} catch (...) {}
});
p.set_value();
@@ -97,15 +78,14 @@ future<> test_exception_thrown_from_rescue_causes_future_to_fail__async_case()
return f;
}
future<> test_failing_intermediate_promise_should_fail_the_master_future() {
SEASTAR_TEST_CASE(test_failing_intermediate_promise_should_fail_the_master_future) {
promise<> p1;
promise<> p2;
auto f = p1.get_future().then([f = std::move(p2.get_future())] () mutable {
OK();
return std::move(f);
}).then([] {
BUG();
BOOST_REQUIRE(false);
});
p1.set_value();
@@ -114,23 +94,7 @@ future<> test_failing_intermediate_promise_should_fail_the_master_future() {
return f.rescue([](auto get) {
try {
get();
BUG();
} catch (...) {
OK();
}
});
}
int main(int ac, char **av)
{
return app_template().run(ac, av, [] {
run_tests(
test_finally_is_called_on_success_and_failure()
.then(test_exception_from_finally_fails_the_target)
.then(test_exception_from_finally_fails_the_target_on_already_resolved)
.then(test_exception_thrown_from_rescue_causes_future_to_fail)
.then(test_exception_thrown_from_rescue_causes_future_to_fail__async_case)
.then(test_failing_intermediate_promise_should_fail_the_master_future)
);
BOOST_REQUIRE(false);
} catch (...) {}
});
}

24
tests/memcache/test.py Executable file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
import time
import subprocess
import sys
if len(sys.argv) < 2:
print('Usage: %s <path-to-memcache>' % sys.argv[0])
memcache_path = sys.argv[1]
def run(cmd):
mc = subprocess.Popen([memcache_path])
print('Memcache started.')
try:
time.sleep(0.1)
cmdline = ['tests/memcache/test_memcache.py'] + cmd
print('Running: ' + ' '.join(cmdline))
subprocess.check_call(cmdline)
finally:
print('Killing memcache...')
mc.kill()
run([])
run(['-U'])

View File

@@ -0,0 +1,232 @@
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include <iostream>
#include <limits>
#include "tests/test-utils.hh"
#include "core/shared_ptr.hh"
#include "net/packet-data-source.hh"
#include "apps/memcache/ascii.hh"
using namespace net;
using parser_type = memcache_ascii_parser;
static packet make_packet(std::vector<std::string> chunks) {
packet p;
for (auto&& chunk : chunks) {
size_t size = chunk.size();
char* b = new char[size];
memcpy(b, chunk.c_str(), size);
p = packet(std::move(p), fragment{b, size}, [b] { delete[] b; });
}
return p;
}
static auto make_input_stream(packet&& p) {
return input_stream<char>(data_source(
std::make_unique<packet_data_source>(std::move(p))));
}
static auto parse(packet&& p) {
auto is = make_input_stream(std::move(p));
auto parser = make_shared<parser_type>();
parser->init();
return is.consume(*parser).then([parser] {
return make_ready_future<shared_ptr<parser_type>>(parser);
});
}
SEASTAR_TEST_CASE(test_set_command_is_parsed) {
return parse(make_packet({"set key 1 2 3\r\nabc\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_flags == 1);
BOOST_REQUIRE(p->_expiration == 2);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_blob == "abc");
});
}
SEASTAR_TEST_CASE(test_empty_data_is_parsed) {
return parse(make_packet({"set key 1 2 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_flags == 1);
BOOST_REQUIRE(p->_expiration == 2);
BOOST_REQUIRE(p->_size == 0);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_blob == "");
});
}
SEASTAR_TEST_CASE(test_superflous_data_is_an_error) {
return parse(make_packet({"set key 0 0 0\r\nasd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::error);
});
}
SEASTAR_TEST_CASE(test_not_enough_data_is_an_error) {
return parse(make_packet({"set key 0 0 3\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::error);
});
}
SEASTAR_TEST_CASE(test_u32_parsing) {
return make_ready_future<>()
.then([] {
return parse(make_packet({"set key 0 0 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_flags == 0);
});
}).then([] {
return parse(make_packet({"set key 12345 0 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_flags == 12345);
});
}).then([] {
return parse(make_packet({"set key -1 0 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::error);
});
}).then([] {
return parse(make_packet({"set key 1-1 0 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::error);
});
}).then([] {
return parse(make_packet({"set key " + std::to_string(std::numeric_limits<uint32_t>::max()) + " 0 0\r\n\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_flags == std::numeric_limits<uint32_t>::max());
});
});
}
SEASTAR_TEST_CASE(test_parsing_of_split_data) {
return make_ready_future<>()
.then([] {
return parse(make_packet({"set key 11", "1 222 3\r\nasd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set key 11", "1 22", "2 3", "\r\nasd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set k", "ey 11", "1 2", "2", "2 3", "\r\nasd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set key 111 222 3\r\n", "asd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set key 111 222 3\r\na", "sd\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set key 111 222 3\r\nasd", "\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
}).then([] {
return parse(make_packet({"set key 111 222 3\r\nasd\r", "\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key");
BOOST_REQUIRE(p->_flags == 111);
BOOST_REQUIRE(p->_expiration == 222);
BOOST_REQUIRE(p->_size == 3);
BOOST_REQUIRE(p->_blob == "asd");
});
});
}
SEASTAR_TEST_CASE(test_get_parsing) {
return make_ready_future<>()
.then([] {
return parse(make_packet({"get key1\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
BOOST_REQUIRE_EQUAL(p->_keys, std::vector<sstring>({"key1"}));
});
}).then([] {
return parse(make_packet({"get key1 key2\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
BOOST_REQUIRE_EQUAL(p->_keys, std::vector<sstring>({"key1", "key2"}));
});
}).then([] {
return parse(make_packet({"get key1 key2 key3\r\n"}))
.then([] (auto p) {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_get);
BOOST_REQUIRE_EQUAL(p->_keys, std::vector<sstring>({"key1", "key2", "key3"}));
});
});
}
SEASTAR_TEST_CASE(test_multiple_requests_in_one_stream) {
auto p = make_shared<parser_type>();
auto is = make_shared(make_input_stream(make_packet({"set key1 1 1 5\r\ndata1\r\nset key2 2 2 6\r\ndata2+\r\n"})));
p->init();
return is->consume(*p).then([p] {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key1");
BOOST_REQUIRE(p->_flags == 1);
BOOST_REQUIRE(p->_expiration == 1);
BOOST_REQUIRE(p->_size == 5);
BOOST_REQUIRE(p->_blob == "data1");
}).then([is, p] {
p->init();
return is->consume(*p).then([p] {
BOOST_REQUIRE(p->_state == parser_type::state::cmd_set);
BOOST_REQUIRE(p->_key == "key2");
BOOST_REQUIRE(p->_flags == 2);
BOOST_REQUIRE(p->_expiration == 2);
BOOST_REQUIRE(p->_size == 6);
BOOST_REQUIRE(p->_blob == "data2+");
});
});
}

120
tests/memcache/test_memcache.py Executable file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
import socket
import struct
import random
import argparse
import time
import unittest
server_addr = None
call = None
def tcp_call(server_addr, msg):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(server_addr)
s.send(msg.encode())
data = s.recv(16*1024)
s.close()
return data
def udp_call(server_addr, msg):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
this_req_id = random.randint(-32768, 32767)
datagram = struct.pack(">hhhh", this_req_id, 0, 1, 0) + msg.encode()
sock.sendto(datagram, server_addr)
messages = {}
n_determined = None
while True:
data, addr = sock.recvfrom(1500)
req_id, seq, n, res = struct.unpack_from(">hhhh", data)
content = data[8:]
if n_determined and n_determined != n:
raise Exception('Inconsitent number of total messages, %d and %d' % (n_determined, n))
n_determined = n
if req_id != this_req_id:
raise Exception('Invalid request id: ' + req_id + ', expected ' + this_req_id)
if seq in messages:
raise Exception('Duplicate message for seq=' + seq)
messages[seq] = content
if len(messages) == n:
break
msg = b''
for k, v in sorted(messages.items(), key=lambda e: e[0]):
msg += v
sock.close()
return msg
class TestCommands(unittest.TestCase):
def call_set(self, key, value, flags=0, expiry=0):
self.assertEqual(call('set %s %d %d %d\r\n%s\r\n' % (key, flags, expiry, len(value), value)), b'STORED\r\n')
def call_delete(self, key):
self.assertEqual(call('delete %s\r\n' % key), b'DELETED\r\n')
def test_basic_commands(self):
self.assertEqual(call('get key\r\n'), b'END\r\n')
self.assertEqual(call('set key 0 0 5\r\nhello\r\n'), b'STORED\r\n')
self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
self.assertEqual(call('delete key\r\n'), b'DELETED\r\n')
self.assertEqual(call('delete key\r\n'), b'NOT_FOUND\r\n')
self.assertEqual(call('get key\r\n'), b'END\r\n')
def test_expiry(self):
self.assertEqual(call('set key 0 1 5\r\nhello\r\n'), b'STORED\r\n')
self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
time.sleep(1)
self.assertEqual(call('get key\r\n'), b'END\r\n')
def test_expiry_at_epoch_time(self):
expiry = int(time.time()) + 1
self.assertEqual(call('set key 0 %d 5\r\nhello\r\n' % expiry), b'STORED\r\n')
self.assertEqual(call('get key\r\n'), b'VALUE key 0 5\r\nhello\r\nEND\r\n')
time.sleep(2)
self.assertEqual(call('get key\r\n'), b'END\r\n')
def test_mutliple_keys_in_get(self):
self.assertEqual(call('set key1 0 0 2\r\nv1\r\n'), b'STORED\r\n')
self.assertEqual(call('set key 0 0 2\r\nv2\r\n'), b'STORED\r\n')
self.assertEqual(call('get key1 key\r\n'), b'VALUE key1 0 2\r\nv1\r\nVALUE key 0 2\r\nv2\r\nEND\r\n')
def test_response_spanning_many_datagrams(self):
key1_data = '1' * 1000
key2_data = '2' * 1000
key3_data = '3' * 1000
self.call_set('key1', key1_data)
self.call_set('key2', key2_data)
self.call_set('key3', key3_data)
self.assertEqual(call('get key1 key2 key3\r\n').decode(),
'VALUE key1 0 %d\r\n%s\r\n' \
'VALUE key2 0 %d\r\n%s\r\n' \
'VALUE key3 0 %d\r\n%s\r\n' \
'END\r\n' % (len(key1_data), key1_data, len(key2_data), key2_data, len(key3_data), key3_data))
self.call_delete('key1')
self.call_delete('key2')
self.call_delete('key3')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="memcache protocol tests")
parser.add_argument('--server', '-s', action="store", help="server adddress in <host>:<port> format", default="localhost:11211")
parser.add_argument('--udp', '-U', action="store_true", help="Use UDP protocol")
args = parser.parse_args()
host, port = args.server.split(':')
server_addr = (host, int(port))
if args.udp:
call = lambda msg: udp_call(server_addr, msg)
else:
call = lambda msg: tcp_call(server_addr, msg)
runner = unittest.TextTestRunner()
itersuite = unittest.TestLoader().loadTestsFromTestCase(TestCommands)
runner.run(itersuite)

17
tests/sstring_test.cc Normal file
View File

@@ -0,0 +1,17 @@
/*
* Copyright 2014 Cloudius Systems
*/
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE core
#include <boost/test/included/unit_test.hpp>
#include "core/sstring.hh"
BOOST_AUTO_TEST_CASE(test_equality) {
BOOST_REQUIRE_EQUAL(sstring("aaa"), sstring("aaa"));
}
BOOST_AUTO_TEST_CASE(test_to_sstring) {
BOOST_REQUIRE_EQUAL(to_sstring(1234567), sstring("1234567"));
}

View File

@@ -6,27 +6,58 @@
#define _TEST_UTILS_HH
#include <iostream>
#include <core/future.hh>
#include <boost/test/included/unit_test.hpp>
#include "core/future.hh"
#include "core/reactor.hh"
#include "core/app-template.hh"
#define BUG() do { \
std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \
throw std::runtime_error("test failed"); \
} while (0)
using namespace boost::unit_test;
#define OK() { \
std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \
} while (0)
class seastar_test {
public:
seastar_test();
virtual ~seastar_test() {}
virtual const char* get_name() = 0;
virtual future<> run_test_case() = 0;
static inline
void run_tests(future<> tests_done) {
tests_done.rescue([] (auto get) {
try {
get();
exit(0);
} catch(...) {
std::terminate();
}
});
void run() {
posix_thread t([this] {
engine.when_started().then([this] {
return run_test_case();
}).rescue([] (auto get) {
try {
get();
engine.exit(0);
} catch (...) {
std::terminate();
}
});
engine.run();
});
t.join();
}
};
static std::vector<seastar_test*> tests;
seastar_test::seastar_test() {
tests.push_back(this);
}
test_suite* init_unit_test_suite(int argc, char* argv[]) {
test_suite* ts = BOOST_TEST_SUITE("seastar-tests");
for (seastar_test* test : tests) {
ts->add(boost::unit_test::make_test_case([test] { test->run(); }, test->get_name()));
}
return ts;
}
#define SEASTAR_TEST_CASE(name) \
struct name : public seastar_test { \
const char* get_name() override { return #name; } \
future<> run_test_case() override; \
}; \
static name name ## _instance; \
future<> name::run_test_case()
#endif

View File

@@ -4,11 +4,19 @@
#include "core/reactor.hh"
#include "core/print.hh"
#include "test-utils.hh"
#include <chrono>
using namespace std::chrono_literals;
#define BUG() do { \
std::cerr << "ERROR @ " << __FILE__ << ":" << __LINE__ << std::endl; \
throw std::runtime_error("test failed"); \
} while (0)
#define OK() { \
std::cerr << "OK @ " << __FILE__ << ":" << __LINE__ << std::endl; \
} while (0)
struct timer_test {
timer t1;
timer t2;