Merge branch 'master' of github.com:cloudius-systems/seastar into db

Conflicts:
	configure.py
	database.cc (added missing include)
	utils/serialize.hh (added missing inlines)
This commit is contained in:
Avi Kivity
2015-02-26 17:45:30 +02:00
21 changed files with 990 additions and 920 deletions

View File

@@ -70,7 +70,7 @@ sudo apt-get install libaio-dev ninja-build ragel libhwloc-dev libnuma-dev libpc
Installing GCC 4.9 for gnu++1y. Unlike the Fedora case above, this will
not harm the existing installation of GCC 4.8, and will install an
additional set of compilers, and additional commands named gcc-4.9,
g++-4.9, etc., that need to be used explictly, while the "gcc", "g++",
g++-4.9, etc., that need to be used explicitly, while the "gcc", "g++",
etc., commands continue to point to the 4.8 versions.
```
@@ -183,7 +183,7 @@ void f() {
}
```
Here, we initate a *get()* operation, requesting that when it completes, a
Here, we initiate a *get()* operation, requesting that when it completes, a
*put()* operation will be scheduled with an incremented value. We also
request that when the *put()* completes, some text will be printed out.
@@ -255,7 +255,7 @@ code to it.
After the I/O operation initiated by `put()` completes, it calls the
continuation associated with `f12`, which simply tells it to call the
continuation assoicated with `f2`. This continuation simply calls
continuation associated with `f2`. This continuation simply calls
`loop_to()`. Both `f12` and `f2` are freed. `loop_to()` then calls
`get()`, which starts the process all over again, allocating new versions
of `f1` and `f2`.
@@ -290,7 +290,7 @@ void f() {
}
```
When the `get_ex` variable is called as a function, it will rethrow
When the `get_ex` variable is called as a function, it will re-throw
the exception that aborted processing, and you can then apply any
needed error handling. It is essentially a transformation of
@@ -312,3 +312,26 @@ void f() {
}
}
```
### Setup notes
SeaStar is a high performance framework and tuned to get the best
performance by default. As such, we're tuned towards polling vs interrupt
driven. Our assumption is that applications written for SeaStar will be
busy handling 100,000 IOPS and beyond. Polling means that each of our
cores will consume 100% cpu even when no work is given to it.
Recommended hardware configuration for SeaStar
----------------------------------------------
* CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA
* NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use
1G to but you may be limited by their capacity.
In addition, the more hardware queue per cpu the better for SeaStar.
Otherwise we have to emulate that in software.
* Disks - Fast SSDs with high number of IOPS.
* Client machines - Usually a single client machine can't load our servers.
Both memaslap (memcached) and WRK (httpd) cannot over load their matching
server counter parts. We recommend running the client on different machine
than the servers and use several of them.

File diff suppressed because it is too large Load Diff

View File

@@ -1,30 +0,0 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2014 Cloudius Systems
* memcached
*/
#include "memcached.hh"
int main(int ac, char** av)
{
constexpr bool WithFlashCache = false;
memcache_instance<WithFlashCache> instance;
return instance.run(ac, av);
}

View File

@@ -20,24 +20,9 @@
#include "core/sstring.hh"
template <bool WithFlashCache>
class memcache_instance;
template <>
class memcache_instance<false> {
public:
int run(int ac, char** av);
};
template <>
class memcache_instance<true> {
public:
int run(int ac, char** av);
};
namespace memcache {
template <bool WithFlashCache>
class item;
class cache;
class item_key {

View File

@@ -122,6 +122,7 @@ urchin_tests = [
'tests/perf/perf_mutation',
'tests/perf/perf_cql_parser',
'tests/urchin/cql_query_test',
'tests/test-serialization',
]
tests = [
@@ -146,7 +147,8 @@ tests = [
'tests/allocator_test',
'tests/output_stream_test',
'tests/udp_zero_copy',
'tests/test-serialization'
'tests/shared_ptr_test',
'tests/slab_test'
] + urchin_tests
apps = [
@@ -154,7 +156,6 @@ apps = [
'seastar',
'apps/seawreck/seawreck',
'apps/memcached/memcached',
'apps/memcached/flashcached',
]
all_artifacts = apps + tests
@@ -187,7 +188,6 @@ libnet = [
'net/proxy.cc',
'net/virtio.cc',
'net/dpdk.cc',
'net/net.cc',
'net/ip.cc',
'net/ethernet.cc',
'net/arp.cc',
@@ -210,7 +210,8 @@ core = [
'util/conversions.cc',
'net/packet.cc',
'net/posix-stack.cc',
'tests/test_runner.cc'
'tests/test_runner.cc',
'net/net.cc',
]
defines = []
@@ -243,10 +244,6 @@ memcache_base = [
'apps/memcached/ascii.rl'
] + libnet + core
memcache = [
'apps/memcached/memcache.cc',
] + memcache_base
cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'Cassandra')
urchin_core = (['database.cc',
@@ -294,8 +291,7 @@ deps = {
'seastar': ['main.cc'] + urchin_core,
'tests/test-reactor': ['tests/test-reactor.cc'] + core,
'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core,
'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache,
'apps/memcached/flashcached': ['apps/memcached/flashcached.cc'] + memcache,
'apps/memcached/memcached': ['apps/memcached/memcache.cc'] + memcache_base,
'tests/memcached/test_ascii_parser': ['tests/memcached/test_ascii_parser.cc'] + memcache_base,
'tests/fileiotest': ['tests/fileiotest.cc'] + core,
'tests/directory_test': ['tests/directory_test.cc'] + core,
@@ -317,7 +313,8 @@ deps = {
'tests/allocator_test': ['tests/allocator_test.cc', 'core/memory.cc', 'core/posix.cc'],
'tests/output_stream_test': ['tests/output_stream_test.cc'] + core + libnet,
'tests/udp_zero_copy': ['tests/udp_zero_copy.cc'] + core + libnet,
'tests/test-serialization': ['tests/test-serialization.cc'],
'tests/shared_ptr_test': ['tests/shared_ptr_test.cc'] + core,
'tests/slab_test': ['tests/slab_test.cc'] + core,
}
for t in urchin_tests:

View File

@@ -47,6 +47,7 @@ public:
};
void schedule(std::unique_ptr<task> t);
void engine_exit(std::exception_ptr eptr = {});
template <typename Func>
class lambda_task : public task {
@@ -584,7 +585,7 @@ public:
try {
get();
} catch (...) {
std::terminate();
engine_exit(std::current_exception());
}
});
}

View File

@@ -52,8 +52,10 @@ class data_source {
protected:
data_source_impl* impl() const { return _dsi.get(); }
public:
data_source() = default;
explicit data_source(std::unique_ptr<data_source_impl> dsi) : _dsi(std::move(dsi)) {}
data_source(data_source&& x) = default;
data_source& operator=(data_source&& x) = default;
future<temporary_buffer<char>> get() { return _dsi->get(); }
};
@@ -78,8 +80,10 @@ public:
class data_sink {
std::unique_ptr<data_sink_impl> _dsi;
public:
data_sink() = default;
explicit data_sink(std::unique_ptr<data_sink_impl> dsi) : _dsi(std::move(dsi)) {}
data_sink(data_sink&& x) = default;
data_sink& operator=(data_sink&& x) = default;
future<> put(std::vector<temporary_buffer<char>> data) {
return _dsi->put(std::move(data));
}
@@ -113,7 +117,10 @@ public:
void operator()(tmp_buf data, Done done);
};
using char_type = CharType;
input_stream() = default;
explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {}
input_stream(input_stream&&) = default;
input_stream& operator=(input_stream&&) = default;
future<temporary_buffer<CharType>> read_exactly(size_t n);
template <typename Consumer>
future<> consume(Consumer& c);
@@ -135,18 +142,21 @@ class output_stream {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
size_t _size;
size_t _size = 0;
size_t _begin = 0;
size_t _end = 0;
bool _trim_to_size;
bool _trim_to_size = false;
private:
size_t available() const { return _end - _begin; }
size_t possibly_available() const { return _size - _begin; }
future<> split_and_put(temporary_buffer<CharType> buf);
public:
using char_type = CharType;
output_stream() = default;
output_stream(data_sink fd, size_t size, bool trim_to_size = false)
: _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {}
output_stream(output_stream&&) = default;
output_stream& operator=(output_stream&&) = default;
future<> write(const char_type* buf, size_t n);
future<> write(const char_type* buf);
future<> write(const sstring& s);

View File

@@ -24,6 +24,7 @@
#include "sstring.hh"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <assert.h>
#include <utility>

View File

@@ -42,6 +42,12 @@
#include <rte_launch.h>
#endif
#include "prefetch.hh"
#include <exception>
#ifdef __GNUC__
#include <iostream>
#include <system_error>
#include <cxxabi.h>
#endif
#ifdef HAVE_OSV
#include <osv/newpoll.hh>
@@ -322,9 +328,9 @@ posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov) {
}
future<file>
reactor::open_file_dma(sstring name) {
return _thread_pool.submit<syscall_result<int>>([name] {
return wrap_syscall<int>(::open(name.c_str(), O_DIRECT | O_CLOEXEC | O_CREAT | O_RDWR, S_IRWXU));
reactor::open_file_dma(sstring name, open_flags flags) {
return _thread_pool.submit<syscall_result<int>>([name, flags] {
return wrap_syscall<int>(::open(name.c_str(), O_DIRECT | O_CLOEXEC | static_cast<int>(flags), S_IRWXU));
}).then([] (syscall_result<int> sr) {
sr.throw_if_error();
return make_ready_future<file>(file(sr.result));
@@ -1523,3 +1529,47 @@ reactor_backend_osv::enable_timer(clock_type::time_point when) {
}
#endif
/**
* engine_exit() exits the reactor. It should be given a pointer to the
* exception which prompted this exit - or a null pointer if the exit
* request was not caused by any exception.
*/
void engine_exit(std::exception_ptr eptr) {
if (!eptr) {
engine().exit(0);
}
#ifndef __GNUC__
std::cerr << "Exiting on unhandled exception.\n";
#else
try {
std::rethrow_exception(eptr);
} catch(...) {
auto tp = abi::__cxa_current_exception_type();
std::cerr << "Exiting on unhandled exception ";
if (tp) {
int status;
char *demangled = abi::__cxa_demangle(tp->name(), 0, 0, &status);
std::cerr << "of type '";
if (status == 0) {
std::cerr << demangled;
free(demangled);
} else {
std::cerr << tp->name();
}
std::cerr << "'.\n";
} else {
std::cerr << "of unknown type.\n";
}
// Print more information on some known exception types
try {
throw;
} catch(const std::system_error &e) {
std::cerr << "Error " << e.code() << " (" << e.code().message() << ")\n";
} catch(const std::exception& e) {
std::cerr << e.what() << "\n";
}
}
#endif
engine().exit(1);
}

View File

@@ -61,6 +61,7 @@
#include "file.hh"
#include "semaphore.hh"
#include "core/scattered_message.hh"
#include "core/enum.hh"
#ifdef HAVE_OSV
#include <osv/newpoll.hh>
@@ -228,8 +229,11 @@ public:
class connected_socket {
std::unique_ptr<connected_socket_impl> _csi;
public:
connected_socket() {};
explicit connected_socket(std::unique_ptr<connected_socket_impl> csi)
: _csi(std::move(csi)) {}
connected_socket(connected_socket&& cs) = default;
connected_socket& operator=(connected_socket&& cs) = default;
input_stream<char> input();
output_stream<char> output();
};
@@ -595,6 +599,17 @@ public:
};
#endif /* HAVE_OSV */
enum class open_flags {
rw = O_RDWR,
ro = O_RDONLY,
wo = O_WRONLY,
create = O_CREAT,
};
inline open_flags operator|(open_flags a, open_flags b) {
return open_flags(static_cast<unsigned int>(a) | static_cast<unsigned int>(b));
}
class reactor {
private:
struct pollfn {
@@ -723,7 +738,7 @@ public:
future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size);
future<file> open_file_dma(sstring name);
future<file> open_file_dma(sstring name, open_flags flags);
future<file> open_directory(sstring name);
template <typename Func>

View File

@@ -29,6 +29,7 @@
#include "sstring.hh"
#include <memory>
#include <vector>
#include <experimental/string_view>
template <typename CharType>
class scattered_message {
@@ -62,6 +63,10 @@ public:
append_static(s.begin(), s.size());
}
void append_static(const std::experimental::string_view& s) {
append_static(s.data(), s.size());
}
template <typename size_type, size_type max_size>
void append(basic_sstring<char_type, size_type, max_size> s) {
if (s.size()) {

336
core/slab.hh Normal file
View File

@@ -0,0 +1,336 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*/
#ifndef __SLAB_ALLOCATOR__
#define __SLAB_ALLOCATOR__
#include <boost/intrusive/unordered_set.hpp>
#include <boost/intrusive/list.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <assert.h>
#include <memory>
#include <vector>
#include <stack>
#include "core/scollectd.hh"
#include "core/align.hh"
namespace bi = boost::intrusive;
/*
* Item requirements
* - Extend it to slab_item_base.
* - First parameter of constructor must be uint8_t _slab_class_id.
* - Implement get_slab_class_id() to return _slab_class_id.
* - Implement is_unlocked() to check if Item can be evicted.
*/
class slab_item_base {
bi::list_member_hook<> _lru_link;
template<typename Item>
friend class slab_class;
};
template<typename Item>
class slab_class {
private:
std::vector<void *> _slab_pages;
std::stack<void *> _free_objects;
bi::list<slab_item_base, bi::member_hook<slab_item_base, bi::list_member_hook<>, &slab_item_base::_lru_link>> _lru;
size_t _size; // size of objects
uint8_t _slab_class_id;
private:
template<typename... Args>
inline Item* create_item(void *object, Args&&... args) {
Item *new_item = new(object) Item(_slab_class_id, std::forward<Args>(args)...);
_lru.push_front(reinterpret_cast<slab_item_base&>(*new_item));
return new_item;
}
inline void* evict_lru_item(std::function<void (Item& item_ref)>& erase_func) {
if (_lru.empty()) {
return nullptr;
}
Item& victim = reinterpret_cast<Item&>(_lru.back());
assert(victim.is_unlocked());
_lru.erase(_lru.iterator_to(reinterpret_cast<slab_item_base&>(victim)));
// WARNING: You need to make sure that erase_func will not release victim back to slab.
erase_func(victim);
return reinterpret_cast<void*>(&victim);
}
public:
slab_class(size_t size, uint8_t slab_class_id)
: _size(size)
, _slab_class_id(slab_class_id)
{
}
slab_class(slab_class&&) = default;
~slab_class() {
_lru.clear();
for (auto& slab : _slab_pages) {
free(slab);
}
}
size_t size() const {
return _size;
}
bool empty() const {
return _free_objects.empty();
}
bool has_no_slab_pages() const {
return _slab_pages.empty();
}
template<typename... Args>
Item *create(Args&&... args) {
assert(!_free_objects.empty());
auto object = _free_objects.top();
_free_objects.pop();
return create_item(object, std::forward<Args>(args)...);
}
template<typename... Args>
Item *create_from_new_page(uint64_t max_object_size, Args&&... args) {
constexpr size_t alignment = std::alignment_of<Item>::value;
void *slab_page = aligned_alloc(alignment, max_object_size);
if (!slab_page) {
throw std::bad_alloc{};
}
_slab_pages.push_back(slab_page);
assert(_size % alignment == 0);
auto objects = max_object_size / _size;
auto object = reinterpret_cast<uint8_t *>(slab_page);
for (auto i = 1u; i < objects; i++) {
object += _size;
_free_objects.push(object);
}
// first object from the allocated slab page is returned.
return create_item(slab_page, std::forward<Args>(args)...);
}
template<typename... Args>
Item *create_from_lru(std::function<void (Item& item_ref)>& erase_func, Args&&... args) {
auto victim_object = evict_lru_item(erase_func);
if (!victim_object) {
throw std::bad_alloc{};
}
return create_item(victim_object, std::forward<Args>(args)...);
}
void free_item(Item *item) {
void *object = item;
_lru.erase(_lru.iterator_to(reinterpret_cast<slab_item_base&>(*item)));
_free_objects.push(object);
}
void touch_item(Item *item) {
auto& item_ref = reinterpret_cast<slab_item_base&>(*item);
_lru.erase(_lru.iterator_to(item_ref));
_lru.push_front(item_ref);
}
void remove_item_from_lru(Item *item) {
auto& item_ref = reinterpret_cast<slab_item_base&>(*item);
_lru.erase(_lru.iterator_to(item_ref));
}
void insert_item_into_lru(Item *item) {
auto& item_ref = reinterpret_cast<slab_item_base&>(*item);
_lru.push_front(item_ref);
}
};
template<typename Item>
class slab_allocator {
private:
std::vector<size_t> _slab_class_sizes;
std::vector<slab_class<Item>> _slab_classes;
std::vector<scollectd::registration> _registrations;
std::function<void (Item& item_ref)> _erase_func;
uint64_t _max_object_size;
uint64_t _available_slab_pages;
struct collectd_stats {
uint64_t allocs;
uint64_t frees;
} _stats;
private:
void initialize_slab_classes(double growth_factor, uint64_t limit) {
constexpr size_t alignment = std::alignment_of<Item>::value;
constexpr size_t initial_size = 96;
size_t size = initial_size; // initial object size
uint8_t slab_class_id = 0U;
while (_max_object_size / size > 1) {
size = align_up(size, alignment);
_slab_class_sizes.push_back(size);
_slab_classes.emplace_back(size, slab_class_id);
size *= growth_factor;
assert(slab_class_id < std::numeric_limits<uint8_t>::max());
slab_class_id++;
}
_slab_class_sizes.push_back(_max_object_size);
_slab_classes.emplace_back(_max_object_size, slab_class_id);
}
slab_class<Item>* get_slab_class(const size_t size) {
// given a size, find slab class with binary search.
auto i = std::lower_bound(_slab_class_sizes.begin(), _slab_class_sizes.end(), size);
if (i == _slab_class_sizes.end()) {
return nullptr;
}
auto dist = std::distance(_slab_class_sizes.begin(), i);
return &_slab_classes[dist];
}
slab_class<Item>* get_slab_class(Item* item) {
auto slab_class_id = item->get_slab_class_id();
assert(slab_class_id >= 0 && slab_class_id < _slab_classes.size());
return &_slab_classes[slab_class_id];
}
void register_collectd_metrics() {
auto add = [this] (auto type_name, auto name, auto data_type, auto func) {
_registrations.push_back(
scollectd::add_polled_metric(scollectd::type_instance_id("slab",
scollectd::per_cpu_plugin_instance,
type_name, name),
scollectd::make_typed(data_type, func)));
};
add("total_operations", "malloc", scollectd::data_type::DERIVE, [&] { return _stats.allocs; });
add("total_operations", "free", scollectd::data_type::DERIVE, [&] { return _stats.frees; });
add("objects", "malloc", scollectd::data_type::GAUGE, [&] { return _stats.allocs - _stats.frees; });
}
public:
slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size)
: _max_object_size(max_object_size)
, _available_slab_pages(limit / max_object_size)
{
initialize_slab_classes(growth_factor, limit);
register_collectd_metrics();
}
slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size, std::function<void (Item& item_ref)> erase_func)
: _erase_func(std::move(erase_func))
, _max_object_size(max_object_size)
, _available_slab_pages(limit / max_object_size)
{
initialize_slab_classes(growth_factor, limit);
register_collectd_metrics();
}
~slab_allocator()
{
_registrations.clear();
}
/**
* Create an item from a given slab class based on requested size.
*/
template<typename... Args>
Item* create(const size_t size, Args&&... args) {
auto slab_class = get_slab_class(size);
if (!slab_class) {
throw std::bad_alloc{};
}
Item *item = nullptr;
if (!slab_class->empty()) {
item = slab_class->create(std::forward<Args>(args)...);
_stats.allocs++;
} else {
if (_available_slab_pages > 0 || slab_class->has_no_slab_pages()) {
item = slab_class->create_from_new_page(_max_object_size, std::forward<Args>(args)...);
if (_available_slab_pages > 0) {
_available_slab_pages--;
}
_stats.allocs++;
} else if (_erase_func) {
item = slab_class->create_from_lru(_erase_func, std::forward<Args>(args)...);
}
}
return item;
}
void lock_item(Item *item) {
// remove item from the lru of its slab class.
auto slab_class = get_slab_class(item);
slab_class->remove_item_from_lru(item);
}
void unlock_item(Item *item) {
// insert item into the lru of its slab class.
auto slab_class = get_slab_class(item);
slab_class->insert_item_into_lru(item);
}
/**
* Free an item back to its original slab class.
*/
void free(Item *item) {
if (item) {
auto slab_class = get_slab_class(item);
slab_class->free_item(item);
_stats.frees++;
}
}
/**
* Update item position in the LRU of its slab class.
*/
void touch(Item *item) {
if (item) {
auto slab_class = get_slab_class(item);
slab_class->touch_item(item);
}
}
/**
* Helper function: Print all available slab classes and their respective properties.
*/
void print_slab_classes() {
auto class_id = 0;
for (auto& slab_class : _slab_classes) {
size_t size = slab_class.size();
printf("slab[%3d]\tsize: %10lu\tper-slab-page: %5lu\n", class_id, size, _max_object_size / size);
class_id++;
}
}
/**
* Helper function: Useful for getting a slab class' chunk size from a size parameter.
*/
size_t class_size(const size_t size) {
auto slab_class = get_slab_class(size);
return (slab_class) ? slab_class->size() : 0;
}
};
#endif /* __SLAB_ALLOCATOR__ */

View File

@@ -8,6 +8,8 @@
#include "core/future-util.hh"
#include "cql3/column_identifier.hh"
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
thread_local logging::logger dblog("database");

View File

@@ -33,8 +33,6 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <boost/asio/ip/address_v4.hpp>
#include <boost/algorithm/string.hpp>
class socket_address {
public:
@@ -58,20 +56,7 @@ struct ipv4_addr {
ipv4_addr() : ip(0), port(0) {}
ipv4_addr(uint32_t ip, uint16_t port) : ip(ip), port(port) {}
ipv4_addr(uint16_t port) : ip(0), port(port) {}
ipv4_addr(const std::string &addr) {
std::vector<std::string> items;
boost::split(items, addr, boost::is_any_of(":"));
if (items.size() == 1) {
ip = boost::asio::ip::address_v4::from_string(addr).to_ulong();
port = 0;
} else if (items.size() == 2) {
ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong();
port = std::stoul(items[1]);
} else {
throw std::invalid_argument("invalid format: " + addr);
}
}
ipv4_addr(const std::string &addr);
ipv4_addr(const socket_address &sa) {
ip = net::ntoh(sa.u.in.sin_addr.s_addr);

View File

@@ -20,12 +20,28 @@
*
*/
#include <boost/asio/ip/address_v4.hpp>
#include <boost/algorithm/string.hpp>
#include "net.hh"
#include <utility>
#include "toeplitz.hh"
using std::move;
ipv4_addr::ipv4_addr(const std::string &addr) {
std::vector<std::string> items;
boost::split(items, addr, boost::is_any_of(":"));
if (items.size() == 1) {
ip = boost::asio::ip::address_v4::from_string(addr).to_ulong();
port = 0;
} else if (items.size() == 2) {
ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong();
port = std::stoul(items[1]);
} else {
throw std::invalid_argument("invalid format: " + addr);
}
}
namespace net {
inline

View File

@@ -42,7 +42,7 @@ int main(int ac, char** av) {
auto&& config = app.configuration();
auto filepath = config["dev"].as<std::string>();
engine().open_file_dma(filepath).then([] (file f) {
engine().open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) {
auto ft = new file_test{std::move(f)};
ft->f.stat().then([ft] (struct stat st) mutable {

View File

@@ -31,7 +31,7 @@ struct file_test {
int main(int ac, char** av) {
static constexpr auto max = 10000;
engine().open_file_dma("testfile.tmp").then([] (file f) {
engine().open_file_dma("testfile.tmp", open_flags::rw | open_flags::create).then([] (file f) {
auto ft = new file_test{std::move(f)};
for (size_t i = 0; i < max; ++i) {
ft->par.wait().then([ft, i] {

View File

@@ -52,7 +52,7 @@ int main(int ac, char** av) {
});
app.run(ac, av, [&app] {
auto fname = app.configuration()["file"].as<std::string>();
engine().open_file_dma(fname).then([] (file f) {
engine().open_file_dma(fname, open_flags::ro | open_flags::create).then([] (file f) {
auto r = make_shared<reader>(std::move(f));
r->is.consume(*r).then([r] {
print("%d lines\n", r->count);

View File

@@ -15,16 +15,38 @@
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2014 Cloudius Systems
* flashcached
* Copyright 2015 Cloudius Systems
*/
#include "memcached.hh"
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE core
int main(int ac, char** av)
{
constexpr bool WithFlashCache = true;
memcache_instance<WithFlashCache> instance;
return instance.run(ac, av);
#include <boost/test/included/unit_test.hpp>
#include "core/shared_ptr.hh"
struct A {
static bool destroyed;
A() {
destroyed = false;
}
virtual ~A() {
destroyed = true;
}
};
struct B {
virtual void x() {}
};
bool A::destroyed = false;
BOOST_AUTO_TEST_CASE(explot_dynamic_cast_use_after_free_problem) {
shared_ptr<A> p = ::make_shared<A>();
{
auto p2 = dynamic_pointer_cast<B>(p);
BOOST_ASSERT(!p2);
}
BOOST_ASSERT(!A::destroyed);
}

123
tests/slab_test.cc Normal file
View File

@@ -0,0 +1,123 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*
* To compile: g++ -std=c++14 slab_test.cc
*/
#include <iostream>
#include <assert.h>
#include "core/slab.hh"
static constexpr size_t max_object_size = 1024*1024;
class item : public slab_item_base {
public:
bi::list_member_hook<> _cache_link;
uint8_t _slab_class_id;
item(uint8_t slab_class_id) : _slab_class_id(slab_class_id) {}
const uint8_t get_slab_class_id() {
return _slab_class_id;
}
const bool is_unlocked() {
return true;
}
};
template<typename Item>
static void free_vector(slab_allocator<Item>& slab, std::vector<item *>& items) {
for (auto item : items) {
slab.free(item);
}
}
static void test_allocation_1(const double growth_factor, const unsigned slab_limit_size) {
slab_allocator<item> slab(growth_factor, slab_limit_size, max_object_size);
size_t size = max_object_size;
slab.print_slab_classes();
std::vector<item *> items;
assert(slab_limit_size % size == 0);
for (auto i = 0u; i < (slab_limit_size / size); i++) {
auto item = slab.create(size);
items.push_back(item);
}
assert(slab.create(size) == nullptr);
free_vector<item>(slab, items);
std::cout << __FUNCTION__ << " done!\n";
}
static void test_allocation_2(const double growth_factor, const unsigned slab_limit_size) {
slab_allocator<item> slab(growth_factor, slab_limit_size, max_object_size);
size_t size = 1024;
std::vector<item *> items;
auto allocations = 0u;
for (;;) {
auto item = slab.create(size);
if (!item) {
break;
}
items.push_back(item);
allocations++;
}
auto class_size = slab.class_size(size);
auto per_slab_page = max_object_size / class_size;
auto available_slab_pages = slab_limit_size / max_object_size;
assert(allocations == (per_slab_page * available_slab_pages));
free_vector<item>(slab, items);
std::cout << __FUNCTION__ << " done!\n";
}
static void test_allocation_with_lru(const double growth_factor, const unsigned slab_limit_size) {
bi::list<item, bi::member_hook<item, bi::list_member_hook<>, &item::_cache_link>> _cache;
unsigned evictions = 0;
slab_allocator<item> slab(growth_factor, slab_limit_size, max_object_size,
[&](item& item_ref) { _cache.erase(_cache.iterator_to(item_ref)); evictions++; });
size_t size = max_object_size;
auto max = slab_limit_size / max_object_size;
for (auto i = 0u; i < max * 1000; i++) {
auto item = slab.create(size);
assert(item != nullptr);
_cache.push_front(*item);
}
assert(evictions == max * 999);
_cache.clear();
std::cout << __FUNCTION__ << " done!\n";
}
int main(int ac, char** av) {
test_allocation_1(1.25, 5*1024*1024);
test_allocation_2(1.07, 5*1024*1024); // 1.07 is the growth factor used by facebook.
test_allocation_with_lru(1.25, 5*1024*1024);
return 0;
}

View File

@@ -30,12 +30,14 @@
class UTFDataFormatException { };
class EOFException { };
inline
void serialize_bool(std::ostream& out, bool b) {
out.put(b ? (char)1 : (char)0);
}
constexpr size_t serialize_bool_size = 1;
static constexpr size_t serialize_bool_size = 1;
inline
bool deserialize_bool(std::istream& in) {
char ret;
if (in.get(ret)) {
@@ -45,20 +47,24 @@ bool deserialize_bool(std::istream& in) {
}
}
inline
void serialize_int8(std::ostream& out, uint8_t val) {
out.put(val);
}
inline
void serialize_int8(std::ostream& out, int8_t val) {
out.put(val);
}
constexpr size_t serialize_int8_size = 1;
static constexpr size_t serialize_int8_size = 1;
inline
void serialize_int8(std::ostream& out, char val) {
out.put(val);
}
inline
int8_t deserialize_int8(std::istream& in) {
char ret;
if (in.get(ret)) {
@@ -68,15 +74,18 @@ int8_t deserialize_int8(std::istream& in) {
}
}
inline
void serialize_int16(std::ostream& out, uint16_t val) {
out.put((char)((val >> 8) & 0xFF));
out.put((char)((val >> 0) & 0xFF));
}
inline
void serialize_int16(std::ostream& out, int16_t val) {
serialize_int16(out, (uint16_t) val);
}
inline
int16_t deserialize_int16(std::istream& in) {
char a1, a2;
in.get(a1);
@@ -87,8 +96,9 @@ int16_t deserialize_int16(std::istream& in) {
return ((int16_t)(uint8_t)a1 << 8) | ((int16_t)(uint8_t)a2 << 0);
}
constexpr size_t serialize_int16_size = 2;
static constexpr size_t serialize_int16_size = 2;
inline
void serialize_int32(std::ostream& out, uint32_t val) {
out.put((char)((val >> 24) & 0xFF));
out.put((char)((val >> 16) & 0xFF));
@@ -96,12 +106,14 @@ void serialize_int32(std::ostream& out, uint32_t val) {
out.put((char)((val >> 0) & 0xFF));
}
inline
void serialize_int32(std::ostream& out, int32_t val) {
serialize_int32(out, (uint32_t) val);
}
constexpr size_t serialize_int32_size = 4;
static constexpr size_t serialize_int32_size = 4;
inline
int32_t deserialize_int32(std::istream& in) {
char a1, a2, a3, a4;
in.get(a1);
@@ -114,6 +126,7 @@ int32_t deserialize_int32(std::istream& in) {
((int32_t)(uint8_t)a4 << 0);
}
inline
void serialize_int64(std::ostream& out, uint64_t val) {
out.put((char)((val >> 56) & 0xFF));
out.put((char)((val >> 48) & 0xFF));
@@ -125,12 +138,14 @@ void serialize_int64(std::ostream& out, uint64_t val) {
out.put((char)((val >> 0) & 0xFF));
}
inline
void serialize_int64(std::ostream& out, int64_t val) {
serialize_int64(out, (uint64_t) val);
}
constexpr size_t serialize_int64_size = 8;
static constexpr size_t serialize_int64_size = 8;
inline
int64_t deserialize_int64(std::istream& in) {
char a1, a2, a3, a4, a5, a6, a7, a8;
in.get(a1);
@@ -159,6 +174,7 @@ int64_t deserialize_int64(std::istream& in) {
// http://docs.oracle.com/javase/7/docs/api/java/io/DataInput.html#modified-utf-8)
// For now we'll just assume those aren't in the string...
// TODO: fix the compatibility with Java even in this case.
inline
void serialize_string(std::ostream& out, const sstring& s) {
// Java specifies that nulls in the string need to be replaced by the
// two bytes 0xC0, 0x80. Let's not bother with such transformation
@@ -177,12 +193,14 @@ void serialize_string(std::ostream& out, const sstring& s) {
out.write(s.c_str(), s.size());
}
inline
size_t serialize_string_size(const sstring& s) {;
// As above, this code is missing the case of modified utf-8
return serialize_int16_size + s.size();
}
inline
void serialize_string(std::ostream& out, const char *s) {
// TODO: like above, need to change UTF-8 when above 16-bit.
auto len = strlen(s);
@@ -195,6 +213,7 @@ void serialize_string(std::ostream& out, const char *s) {
out.write(s, len);
}
inline
sstring deserialize_string(std::istream& in) {
int len = deserialize_int16(in);
sstring ret(sstring::initialized_later(), len);