diff --git a/README.md b/README.md index dcf61804cb..0dcb65d038 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ sudo apt-get install libaio-dev ninja-build ragel libhwloc-dev libnuma-dev libpc Installing GCC 4.9 for gnu++1y. Unlike the Fedora case above, this will not harm the existing installation of GCC 4.8, and will install an additional set of compilers, and additional commands named gcc-4.9, -g++-4.9, etc., that need to be used explictly, while the "gcc", "g++", +g++-4.9, etc., that need to be used explicitly, while the "gcc", "g++", etc., commands continue to point to the 4.8 versions. ``` @@ -183,7 +183,7 @@ void f() { } ``` -Here, we initate a *get()* operation, requesting that when it completes, a +Here, we initiate a *get()* operation, requesting that when it completes, a *put()* operation will be scheduled with an incremented value. We also request that when the *put()* completes, some text will be printed out. @@ -255,7 +255,7 @@ code to it. After the I/O operation initiated by `put()` completes, it calls the continuation associated with `f12`, which simply tells it to call the -continuation assoicated with `f2`. This continuation simply calls +continuation associated with `f2`. This continuation simply calls `loop_to()`. Both `f12` and `f2` are freed. `loop_to()` then calls `get()`, which starts the process all over again, allocating new versions of `f1` and `f2`. @@ -290,7 +290,7 @@ void f() { } ``` -When the `get_ex` variable is called as a function, it will rethrow +When the `get_ex` variable is called as a function, it will re-throw the exception that aborted processing, and you can then apply any needed error handling. It is essentially a transformation of @@ -312,3 +312,26 @@ void f() { } } ``` + +### Setup notes + +SeaStar is a high performance framework and tuned to get the best +performance by default. As such, we're tuned towards polling vs interrupt +driven. Our assumption is that applications written for SeaStar will be +busy handling 100,000 IOPS and beyond. Polling means that each of our +cores will consume 100% cpu even when no work is given to it. + + +Recommended hardware configuration for SeaStar +---------------------------------------------- + +* CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA +* NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use + 1G to but you may be limited by their capacity. + In addition, the more hardware queue per cpu the better for SeaStar. + Otherwise we have to emulate that in software. +* Disks - Fast SSDs with high number of IOPS. +* Client machines - Usually a single client machine can't load our servers. + Both memaslap (memcached) and WRK (httpd) cannot over load their matching + server counter parts. We recommend running the client on different machine + than the servers and use several of them. diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 962c212c55..881f092cc3 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -16,7 +16,7 @@ * under the License. */ /* - * Copyright 2014 Cloudius Systems + * Copyright 2014-2015 Cloudius Systems */ #include @@ -35,13 +35,14 @@ #include "core/units.hh" #include "core/distributed.hh" #include "core/vector-data-sink.hh" +#include "core/bitops.hh" +#include "core/slab.hh" +#include "core/align.hh" #include "net/api.hh" #include "net/packet-data-source.hh" #include "apps/memcached/ascii.hh" -#include "core/bitops.hh" #include "memcached.hh" #include -#include #define PLATFORM "seastar" #define VERSION "v1.0" @@ -51,249 +52,131 @@ using namespace net; namespace bi = boost::intrusive; - -namespace flashcache { - -constexpr int block_size_shift = 12; -constexpr uint32_t block_size = 1 << block_size_shift; - -struct block { -private: - uint32_t _blk_id; // granularity: block_size -public: - block() = default; - block(uint32_t blk_id) : _blk_id(blk_id) {} - uint64_t get_addr() { return _blk_id * block_size; } -}; - -struct devfile { -private: - file _f; -public: - devfile(file&& f) : _f(std::move(f)) {} - - file& f() { - return _f; - } - - friend class subdevice; -}; - -class subdevice { - foreign_ptr> _dev; - uint64_t _offset; - uint64_t _end; - std::queue _free_blocks; - semaphore _par = { 1000 }; -public: - subdevice(foreign_ptr> dev, uint64_t offset, uint64_t length) - : _dev(std::move(dev)) - , _offset(offset) - , _end(offset + length) - { - auto blks = length / block_size; - for (auto blk_id = 0U; blk_id < blks; blk_id++) { - _free_blocks.push(blk_id); - } - } - - block allocate(void) { - // FIXME: handle better the case where there is no disk space left for allocations. - assert(!_free_blocks.empty()); - block blk = _free_blocks.front(); - _free_blocks.pop(); - return blk; - } - - void free(block blk) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - // Issue trimming operation on the block being freed. - _dev->_f.discard(actual_blk_addr, block_size).finally([this, blk]() mutable { - _free_blocks.push(blk); - }); - } - - future read(block& blk, void* buffer) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - return _dev->_f.dma_read(actual_blk_addr, buffer, block_size); - } - - future write(block& blk, const void* buffer) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - return _dev->_f.dma_write(actual_blk_addr, buffer, block_size); - } - - future<> wait() { - return _par.wait(); - } - - void signal() { - _par.signal(); - } -}; - -} /* namespace flashcache */ - namespace memcache { +static constexpr double default_slab_growth_factor = 1.25; +static constexpr uint64_t default_slab_page_size = 1UL*MB; +static constexpr uint64_t default_per_cpu_slab_size = 64UL*MB; +static __thread slab_allocator* slab; + template using optional = boost::optional; +struct expiration { + static constexpr uint32_t seconds_in_a_month = 60U * 60 * 24 * 30; + uint32_t _time; -struct memcache_item_base { - memcache_item_base(uint32_t size) {} -}; + expiration() : _time(0U) {} -enum class item_state { - MEM, - TO_MEM_DISK, // transition period from MEM to MEM_DISK - MEM_DISK, - DISK, - ERASED, -}; - -struct flashcache_item_base { -private: - item_state _state = item_state::MEM; - uint32_t _size; - // NOTE: vector must be sorted, i.e. first block of data should be in the front of the list. - std::vector _used_blocks; - flashcache::subdevice* _subdev = nullptr; -public: - semaphore _lookup_sem = { 1 }; - - flashcache_item_base(uint32_t size) : _size(size) {} - - ~flashcache_item_base() { - if (_used_blocks.empty()) { - return; + expiration(uint32_t seconds) { + if (seconds == 0U) { + _time = 0U; // means never expire. + } else if (seconds <= seconds_in_a_month) { + _time = seconds + time(0); // from delta + } else { + _time = seconds; // from real time } - assert(_subdev != nullptr); - // Needed to free used blocks only when the underlying item is destroyed, - // otherwise they could be reused while there is I/O in progress to them. - for (auto& blk : _used_blocks) { - _subdev->free(blk); - } - _used_blocks.clear(); } - void set_subdevice(flashcache::subdevice* subdev) { - assert(_subdev == nullptr); - _subdev = subdev; + bool ever_expires() { + return _time; } - bool is_present() { - return (_state == item_state::MEM || _state == item_state::TO_MEM_DISK || - _state == item_state::MEM_DISK); - } - - item_state get_state() { - return _state; - } - - void set_state(item_state state) { - _state = state; - } - - uint32_t size() { - return _size; - } - - size_t used_blocks_size() { - return _used_blocks.size(); - } - - void used_blocks_clear() { - _used_blocks.clear(); - } - - bool used_blocks_empty() { - return _used_blocks.empty(); - } - - void used_blocks_resize(size_t new_size) { - _used_blocks.resize(new_size); - } - - flashcache::block used_block(unsigned int index) { - assert(index < _used_blocks.size()); - return _used_blocks[index]; - } - - void use_block(unsigned int index, flashcache::block blk) { - assert(index < _used_blocks.size()); - _used_blocks[index] = blk; + clock_type::time_point to_time_point() { + return clock_type::time_point(std::chrono::seconds(_time)); } }; -template -class item : public std::conditional::type { +class item : public slab_item_base { public: - using item_type = item; using version_type = uint64_t; using time_point = clock_type::time_point; using duration = clock_type::duration; + static constexpr uint8_t field_alignment = alignof(void*); private: using hook_type = bi::unordered_set_member_hook<>; // TODO: align shared data to cache line boundary - item_key _key; - sstring _data; - const sstring _ascii_prefix; version_type _version; - int _ref_count; hook_type _cache_link; - bi::list_member_hook<> _lru_link; bi::list_member_hook<> _timer_link; - time_point _expiry; - template + size_t _key_hash; + expiration _expiry; + uint32_t _value_size; + uint16_t _ref_count; + uint8_t _key_size; + uint8_t _ascii_prefix_size; + uint8_t _slab_class_id; + char _unused[3]; + char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value. friend class cache; - friend class memcache_cache_base; - friend class flashcache_cache_base; public: - item(item_key&& key, sstring&& ascii_prefix, sstring&& data, clock_type::time_point expiry, version_type version = 1) - : std::conditional::type(data.size()) - , _key(std::move(key)) - , _data(std::move(data)) - , _ascii_prefix(std::move(ascii_prefix)) - , _version(version) - , _ref_count(0) + item(uint8_t slab_class_id, item_key&& key, sstring&& ascii_prefix, + sstring&& value, expiration expiry, version_type version = 1) + : _version(version) + , _key_hash(key.hash()) , _expiry(expiry) + , _value_size(value.size()) + , _ref_count(0) + , _key_size(key.key().size()) + , _ascii_prefix_size(ascii_prefix.size()) + , _slab_class_id(slab_class_id) { + assert(_key_size <= std::numeric_limits::max()); + assert(_ascii_prefix_size <= std::numeric_limits::max()); + // storing key + memcpy(_data, key.key().c_str(), _key_size); + // storing ascii_prefix + memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size); + // storing value + memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment), + value.c_str(), _value_size); } item(const item&) = delete; item(item&&) = delete; clock_type::time_point get_timeout() { - return _expiry; + return _expiry.to_time_point(); } version_type version() { return _version; } - sstring& data() { - return _data; + const std::experimental::string_view key() const { + return std::experimental::string_view(_data, _key_size); } - const sstring& ascii_prefix() { - return _ascii_prefix; + const std::experimental::string_view ascii_prefix() const { + const char *p = _data + align_up(_key_size, field_alignment); + return std::experimental::string_view(p, _ascii_prefix_size); } - const sstring& key() { - return _key.key(); + const std::experimental::string_view value() const { + const char *p = _data + align_up(_key_size, field_alignment) + + align_up(_ascii_prefix_size, field_alignment); + return std::experimental::string_view(p, _value_size); + } + + size_t key_size() const { + return _key_size; + } + + size_t ascii_prefix_size() const { + return _ascii_prefix_size; + } + + size_t value_size() const { + return _value_size; } optional data_as_integral() { - auto str = _data.c_str(); + auto str = value().data(); if (str[0] == '-') { return {}; } - auto len = _data.size(); + auto len = _value_size; // Strip trailing space while (len && str[len - 1] == ' ') { @@ -309,46 +192,67 @@ public: // needed by timer_set bool cancel() { - assert(false); return false; } - friend bool operator==(const item_type &a, const item_type &b) { - return a._key == b._key; + // get_slab_class_id() and is_unlocked() are methods required by slab class. + const uint8_t get_slab_class_id() { + return _slab_class_id; + } + bool is_unlocked() { + return _ref_count == 1; } - friend std::size_t hash_value(const item_type &i) { - return std::hash()(i._key); + friend bool operator==(const item &a, const item &b) { + return (a._key_hash == b._key_hash) && + (a._key_size == b._key_size) && + (memcmp(a._data, b._data, a._key_size) == 0); } - friend inline void intrusive_ptr_add_ref(item_type* it) { + friend std::size_t hash_value(const item &i) { + return i._key_hash; + } + + friend inline void intrusive_ptr_add_ref(item* it) { + assert(it->_ref_count >= 0); ++it->_ref_count; - } - - friend inline void intrusive_ptr_release(item_type* it) { - if (--it->_ref_count == 0) { - delete it; + if (it->_ref_count == 2) { + slab->lock_item(it); } } - template + friend inline void intrusive_ptr_release(item* it) { + --it->_ref_count; + if (it->_ref_count == 1) { + slab->unlock_item(it); + } else if (it->_ref_count == 0) { + slab->free(it); + } + assert(it->_ref_count >= 0); + } + friend class item_key_cmp; }; -template struct item_key_cmp { - bool operator()(const item_key& key, const item& it) const { - return key == it._key; +private: + bool compare(const item_key& key, const item& it) const { + return (it._key_hash == key.hash()) && + (it._key_size == key.key().size()) && + (memcmp(it._data, key.key().c_str(), it._key_size) == 0); + } +public: + bool operator()(const item_key& key, const item& it) const { + return compare(key, it); } - bool operator()(const item& it, const item_key& key) const { - return key == it._key; + bool operator()(const item& it, const item_key& key) const { + return compare(key, it); } }; -template -using item_ptr = foreign_ptr>>; +using item_ptr = foreign_ptr>; struct cache_stats { size_t _get_hits {}; @@ -370,9 +274,6 @@ struct cache_stats { size_t _resize_failure {}; size_t _size {}; size_t _reclaims{}; - // flashcache-only stats. - size_t _loads{}; - size_t _stores{}; void operator+=(const cache_stats& o) { _get_hits += o._get_hits; @@ -394,8 +295,6 @@ struct cache_stats { _resize_failure += o._resize_failure; _size += o._size; _reclaims += o._reclaims; - _loads += o._loads; - _stores += o._stores; } }; @@ -423,348 +322,69 @@ struct item_insertion_data { item_key key; sstring ascii_prefix; sstring data; - clock_type::time_point expiry; + expiration expiry; }; -struct memcache_cache_base { +class cache { private: - using item_type = item; - using item_lru_list = bi::list, &item_type::_lru_link>>; - item_lru_list _lru; - cache_stats _stats; -public: - void do_setup(foreign_ptr> dev, uint64_t offset, uint64_t length) {} - - void do_erase(item_type& item_ref) { - _lru.erase(_lru.iterator_to(item_ref)); - } - - size_t do_reclaim(size_t target) { - return 0; - } - - future<> do_get(boost::intrusive_ptr item) { - auto& item_ref = *item; - _lru.erase(_lru.iterator_to(item_ref)); - _lru.push_front(item_ref); - return make_ready_future<>(); - } - - void do_set(item_type& new_item_ref) {} - - template - friend class cache; -}; - -struct flashcache_cache_base { -private: - using item_type = item; - using item_lru_list = bi::list, &item_type::_lru_link>>; - item_lru_list _lru; // mem_lru - item_lru_list _mem_disk_lru; - item_lru_list _disk_lru; - uint64_t _total_mem = 0; // total bytes from items' value in mem lru. - uint64_t _total_mem_disk = 0; // total bytes from items' value in mem_disk lru. - std::unique_ptr _subdev; - cache_stats _stats; - - future<> load_item_data(boost::intrusive_ptr item); - future<> store_item_data(boost::intrusive_ptr item); -public: - flashcache::subdevice& get_subdevice() { - auto& subdev_ref = *_subdev.get(); - return subdev_ref; - } - - void do_setup(foreign_ptr> dev, uint64_t offset, uint64_t length) { - _subdev = std::make_unique(std::move(dev), offset, length); - } - - void do_erase(item_type& item_ref) { - switch(item_ref.get_state()) { - case item_state::MEM: - _lru.erase(_lru.iterator_to(item_ref)); - _total_mem -= item_ref.size(); - break; - case item_state::TO_MEM_DISK: - _total_mem_disk -= item_ref.size(); - break; - case item_state::MEM_DISK: - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(item_ref)); - _total_mem_disk -= item_ref.size(); - break; - case item_state::DISK: - _disk_lru.erase(_disk_lru.iterator_to(item_ref)); - break; - default: - assert(0); - } - item_ref.set_state(item_state::ERASED); - } - - size_t do_reclaim(size_t target) { - size_t reclaimed_so_far = 0; - - auto i = this->_mem_disk_lru.end(); - if (i == this->_mem_disk_lru.begin()) { - return 0; - } - - --i; - - bool done = false; - do { - item_type& victim = *i; - if (i != this->_mem_disk_lru.begin()) { - --i; - } else { - done = true; - } - - if (victim._ref_count == 1) { - auto item_data_size = victim.size(); - - assert(victim.data().size() == item_data_size); - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(victim)); - victim.data().reset(); - assert(victim.data().size() == 0); - victim.set_state(item_state::DISK); - _disk_lru.push_front(victim); - reclaimed_so_far += item_data_size; - _total_mem_disk -= item_data_size; - - if (reclaimed_so_far >= target) { - done = true; - } - } - } while (!done); - return reclaimed_so_far; - } - - future<> do_get(boost::intrusive_ptr item) { - return load_item_data(item); - } - - // TODO: Handle storing/loading of zero-length items. - void do_set(item_type& new_item_ref) { - _total_mem += new_item_ref.size(); - new_item_ref.set_subdevice(_subdev.get()); - - // Adjust items between mem (20%) and mem_disk (80%) lru lists. - // With that ratio, items will be constantly scheduled to be stored on disk, - // and that's good because upon memory pressure, we would have enough items - // to satisfy the amount of memory asked to be reclaimed. - if (_total_mem >= 1*MB) { - auto total = _total_mem + _total_mem_disk; - auto total_mem_disk_perc = _total_mem_disk * 100 / total; - if (total_mem_disk_perc < 80) { - // Store least recently used item from lru into mem_disk lru. - item_type& item_ref = _lru.back(); - auto item = boost::intrusive_ptr(&item_ref); - auto item_data_size = item->size(); - - assert(item->get_state() == item_state::MEM); - _lru.erase(_lru.iterator_to(item_ref)); - item->set_state(item_state::TO_MEM_DISK); - store_item_data(item); - _total_mem -= item_data_size; - _total_mem_disk += item_data_size; - } - } - } - - template - friend class cache; -}; - -// -// Load item data from disk into memory. -// NOTE: blocks used aren't freed because item will be moved to _mem_disk_lru. -// -future<> flashcache_cache_base::load_item_data(boost::intrusive_ptr item) { - if (item->is_present()) { - auto& item_ref = *item; - switch(item->get_state()) { - case item_state::MEM: - _lru.erase(_lru.iterator_to(item_ref)); - _lru.push_front(item_ref); - break; - case item_state::TO_MEM_DISK: - break; - case item_state::MEM_DISK: - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(item_ref)); - _mem_disk_lru.push_front(item_ref); - break; - default: - assert(0); - } - return make_ready_future<>(); - } - return item->_lookup_sem.wait().then([this, item] { - if (item->is_present()) { - return make_ready_future<>(); - } - assert(item->get_state() == item_state::DISK); - - flashcache::subdevice& subdev = this->get_subdevice(); - auto sem = make_lw_shared({ 0 }); - auto& item_data = item->data(); - auto item_size = item->size(); - auto blocks_to_load = item->used_blocks_size(); - assert(item_data.empty()); - assert(item_size >= 1); - assert(blocks_to_load == (item_size + (flashcache::block_size - 1)) / flashcache::block_size); - - auto to_read = item_size; - item_data = sstring(sstring::initialized_later(), item_size); - for (auto i = 0U; i < blocks_to_load; ++i) { - auto read_size = std::min(to_read, flashcache::block_size); - - subdev.wait().then([&subdev, sem, item, read_size, i] { - // If the item is already erased no need to schedule new IOs, just signal the semaphores. - if (item->get_state() == item_state::ERASED) { - return make_ready_future<>(); - } - // TODO: Avoid allocation and copying by directly using item's data (should be aligned). - auto rbuf = allocate_aligned_buffer(flashcache::block_size, flashcache::block_size); - auto rb = rbuf.get(); - flashcache::block blk = item->used_block(i); - - return subdev.read(blk, rb).then( - [item, read_size, rbuf = std::move(rbuf), i] (size_t ret) mutable { - assert(ret == flashcache::block_size); - char *data = item->data().begin(); - assert(data != nullptr); - assert((i * flashcache::block_size + read_size) <= item->data().size()); // overflow check - memcpy(data + (i * flashcache::block_size), rbuf.get(), read_size); - }).or_terminate(); - }).finally([&subdev, sem] { - subdev.signal(); - sem->signal(1); - }); - to_read -= read_size; - } - - return sem->wait(blocks_to_load).then([this, item] () mutable { - auto& item_data = item->data(); - auto item_data_size = item_data.size(); - assert(item_data_size == item->size()); - - if (item->get_state() != item_state::ERASED) { - // Adjusting LRU: item is moved from _disk_lru to _mem_disk_lru. - auto& item_ref = *item; - _disk_lru.erase(_disk_lru.iterator_to(item_ref)); - item->set_state(item_state::MEM_DISK); - _mem_disk_lru.push_front(item_ref); - _total_mem_disk += item_data_size; - } - this->_stats._loads++; - }); - }).finally([item] { - item->_lookup_sem.signal(); - }); -} - -// -// Store item data from memory into disk. -// NOTE: Item data remains present in memory. -// -future<> flashcache_cache_base::store_item_data(boost::intrusive_ptr item) { - assert(item->get_state() == item_state::TO_MEM_DISK); - - flashcache::subdevice& subdev = this->get_subdevice(); - auto sem = make_lw_shared({ 0 }); - auto& item_data = item->data(); - auto item_size = item->size(); - auto blocks_to_store = (item_size + (flashcache::block_size - 1)) / flashcache::block_size; - assert(item_data.size() == item_size); - assert(item->used_blocks_empty()); - assert(blocks_to_store >= 1); - - auto to_write = item_size; - item->used_blocks_resize(blocks_to_store); - for (auto i = 0U; i < blocks_to_store; ++i) { - auto write_size = std::min(to_write, flashcache::block_size); - - subdev.wait().then([&subdev, sem, item, write_size, i] { - if (item->get_state() == item_state::ERASED) { - return make_ready_future<>(); - } - auto wbuf = allocate_aligned_buffer(flashcache::block_size, flashcache::block_size); - const char *data = item->data().c_str(); - assert(data != nullptr); - assert((i * flashcache::block_size + write_size) <= item->data().size()); // overflow check - memcpy(wbuf.get(), data + (i * flashcache::block_size), write_size); - auto wb = wbuf.get(); - flashcache::block blk = subdev.allocate(); - item->use_block(i, blk); - - return subdev.write(blk, wb).then([] (size_t ret) mutable { - assert(ret == flashcache::block_size); - }).or_terminate(); - }).finally([&subdev, sem] { - subdev.signal(); - sem->signal(1); - }); - to_write -= write_size; - } - - return sem->wait(blocks_to_store).then([this, item] () mutable { - // NOTE: Item was removed previously from mem lru so as to avoid races, i.e. - // upon another set, the same item would be popped from the back of the lru. - auto& item_data = item->data(); - auto item_data_size = item_data.size(); - assert(item_data_size == item->size()); - - if (item->get_state() != item_state::ERASED) { - // Adjusting LRU: item is moved from mem lru to mem_disk lru. - auto& item_ref = *item; - item->set_state(item_state::MEM_DISK); - _mem_disk_lru.push_front(item_ref); - } - this->_stats._stores++; - }); -} - -template -class cache : public std::conditional::type { -private: - using item_type = item; - using cache_type = bi::unordered_set, + using cache_type = bi::unordered_set, bi::power_2_buckets, bi::constant_time_size>; using cache_iterator = typename cache_type::iterator; - using cache_bucket = typename cache_type::bucket_type; static constexpr size_t initial_bucket_count = 1 << 10; static constexpr float load_factor = 0.75f; size_t _resize_up_threshold = load_factor * initial_bucket_count; - cache_bucket* _buckets; + cache_type::bucket_type* _buckets; cache_type _cache; - timer_set _alive; + timer_set _alive; timer<> _timer; + cache_stats _stats; timer<> _flush_timer; - memory::reclaimer _reclaimer; private: - size_t item_footprint(item_type& item_ref) { - return sizeof(item_type) + item_ref._data.size() + item_ref._ascii_prefix.size() + item_ref.key().size(); + size_t item_size(item& item_ref) { + constexpr size_t field_alignment = alignof(void*); + return sizeof(item) + + align_up(item_ref.key_size(), field_alignment) + + align_up(item_ref.ascii_prefix_size(), field_alignment) + + item_ref.value_size(); } - template - void erase(item_type& item_ref) { + size_t item_size(item_insertion_data& insertion) { + constexpr size_t field_alignment = alignof(void*); + auto size = sizeof(item) + + align_up(insertion.key.key().size(), field_alignment) + + align_up(insertion.ascii_prefix.size(), field_alignment) + + insertion.data.size(); +#ifdef __DEBUG__ + static bool print_item_footprint = true; + if (print_item_footprint) { + print_item_footprint = false; + std::cout << __FUNCTION__ << ": " << size << "\n"; + std::cout << "sizeof(item) " << sizeof(item) << "\n"; + std::cout << "key.size " << insertion.key.key().size() << "\n"; + std::cout << "value.size " << insertion.data.size() << "\n"; + std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n"; + } +#endif + return size; + } + + template + void erase(item& item_ref) { if (IsInCache) { _cache.erase(_cache.iterator_to(item_ref)); } if (IsInTimerList) { - _alive.remove(item_ref); + if (item_ref._expiry.ever_expires()) { + _alive.remove(item_ref); + } + } + _stats._bytes -= item_size(item_ref); + if (Release) { + // memory used by item shouldn't be freed when slab is replacing it with another item. + intrusive_ptr_release(&item_ref); } - this->do_erase(item_ref); - this->_stats._bytes -= item_footprint(item_ref); - intrusive_ptr_release(&item_ref); } void expire() { @@ -773,52 +393,51 @@ private: auto item = &*exp.begin(); exp.pop_front(); erase(*item); - this->_stats._expired++; + _stats._expired++; } _timer.arm(_alive.get_next_timeout()); } inline cache_iterator find(const item_key& key) { - return _cache.find(key, std::hash(), item_key_cmp()); + return _cache.find(key, std::hash(), item_key_cmp()); } template inline cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) { auto& old_item = *i; - - auto new_item = new item_type(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), - Origin::move_if_local(insertion.data), insertion.expiry, old_item._version + 1); - intrusive_ptr_add_ref(new_item); + uint64_t old_item_version = old_item._version; erase(old_item); + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1); + intrusive_ptr_add_ref(new_item); + auto insert_result = _cache.insert(*new_item); assert(insert_result.second); - if (_alive.insert(*new_item)) { + if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) { _timer.rearm(new_item->get_timeout()); } - this->_lru.push_front(*new_item); - this->do_set(*new_item); - this->_stats._bytes += item_footprint(*new_item); + _stats._bytes += size; return insert_result.first; } template inline void add_new(item_insertion_data& insertion) { - auto new_item = new item_type(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), Origin::move_if_local(insertion.data), insertion.expiry); intrusive_ptr_add_ref(new_item); auto& item_ref = *new_item; _cache.insert(item_ref); - if (_alive.insert(item_ref)) { + if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) { _timer.rearm(item_ref.get_timeout()); } - this->_lru.push_front(*new_item); - this->do_set(*new_item); - this->_stats._bytes += item_footprint(item_ref); + _stats._bytes += size; maybe_rehash(); } @@ -827,10 +446,9 @@ private: auto new_size = _cache.bucket_count() * 2; auto old_buckets = _buckets; try { - _buckets = new cache_bucket[new_size]; + _buckets = new cache_type::bucket_type[new_size]; } catch (const std::bad_alloc& e) { - this->_stats._resize_failure++; - evict(100); // In order to amortize the cost of resize failure + _stats._resize_failure++; return; } _cache.rehash(typename cache_type::bucket_traits(_buckets, new_size)); @@ -838,76 +456,33 @@ private: _resize_up_threshold = _cache.bucket_count() * load_factor; } } - - // Evicts at most @count items. - void evict(size_t count) { - while (!this->_lru.empty() && count--) { - erase(this->_lru.back()); - this->_stats._evicted++; - } - } - - void reclaim(size_t target) { - size_t reclaimed_so_far = 0; - this->_stats._reclaims++; - - reclaimed_so_far += this->do_reclaim(target); - if (reclaimed_so_far >= target) { - return; - } - - auto i = this->_lru.end(); - if (i == this->_lru.begin()) { - return; - } - - --i; - - bool done = false; - do { - item_type& victim = *i; - if (i != this->_lru.begin()) { - --i; - } else { - done = true; - } - - // If the item is shared, we can not assume that removing it from - // cache would cause the memory to be reclaimed in a timely manner - // so we reclaim only items which are not shared. - if (victim._ref_count == 1) { - reclaimed_so_far += item_footprint(victim); - erase(victim); - this->_stats._evicted++; - - if (reclaimed_so_far >= target) { - done = true; - } - } - } while (!done); - } public: - cache() - : _buckets(new cache_bucket[initial_bucket_count]) - , _cache(typename cache_type::bucket_traits(_buckets, initial_bucket_count)) - , _reclaimer([this] { reclaim(5*MB); }) + cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size) + : _buckets(new cache_type::bucket_type[initial_bucket_count]) + , _cache(cache_type::bucket_traits(_buckets, initial_bucket_count)) { _timer.set_callback([this] { expire(); }); _flush_timer.set_callback([this] { flush_all(); }); + + // initialize per-thread slab allocator. + slab = new slab_allocator(default_slab_growth_factor, per_cpu_slab_size, slab_page_size, + [this](item& item_ref) { erase(item_ref); _stats._evicted++; }); +#ifdef __DEBUG__ + static bool print_slab_classes = true; + if (print_slab_classes) { + print_slab_classes = false; + slab->print_slab_classes(); + } +#endif } ~cache() { flush_all(); } - future<> setup(foreign_ptr> dev, uint64_t offset, uint64_t length) { - this->do_setup(std::move(dev), offset, length); - return make_ready_future<>(); - } - void flush_all() { _flush_timer.cancel(); - _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item_type* it) { + _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) { erase(*it); }); } @@ -921,19 +496,15 @@ public: auto i = find(insertion.key); if (i != _cache.end()) { add_overriding(i, insertion); - this->_stats._set_replaces++; + _stats._set_replaces++; return true; } else { add_new(insertion); - this->_stats._set_adds++; + _stats._set_adds++; return false; } } - bool remote_set(item_insertion_data& insertion) { - return set(insertion); - } - template bool add(item_insertion_data& insertion) { auto i = find(insertion.key); @@ -941,15 +512,11 @@ public: return false; } - this->_stats._set_adds++; + _stats._set_adds++; add_new(insertion); return true; } - bool remote_add(item_insertion_data& insertion) { - return add(insertion); - } - template bool replace(item_insertion_data& insertion) { auto i = find(insertion.key); @@ -957,62 +524,51 @@ public: return false; } - this->_stats._set_replaces++; + _stats._set_replaces++; add_overriding(i, insertion); return true; } - bool remote_replace(item_insertion_data& insertion) { - return replace(insertion); - } - bool remove(const item_key& key) { auto i = find(key); if (i == _cache.end()) { - this->_stats._delete_misses++; + _stats._delete_misses++; return false; } - this->_stats._delete_hits++; + _stats._delete_hits++; auto& item_ref = *i; erase(item_ref); return true; } - future> get(const item_key& key) { + item_ptr get(const item_key& key) { auto i = find(key); if (i == _cache.end()) { - this->_stats._get_misses++; - return make_ready_future>(nullptr); + _stats._get_misses++; + return nullptr; } - this->_stats._get_hits++; + _stats._get_hits++; auto& item_ref = *i; - auto item = boost::intrusive_ptr(&item_ref); - return this->do_get(item).then([item] { - return make_ready_future>(make_foreign(item)); - }); + return item_ptr(&item_ref); } template - cas_result cas(item_insertion_data& insertion, typename item_type::version_type version) { + cas_result cas(item_insertion_data& insertion, item::version_type version) { auto i = find(insertion.key); if (i == _cache.end()) { - this->_stats._cas_misses++; + _stats._cas_misses++; return cas_result::not_found; } auto& item_ref = *i; if (item_ref._version != version) { - this->_stats._cas_badval++; + _stats._cas_badval++; return cas_result::bad_version; } - this->_stats._cas_hits++; + _stats._cas_hits++; add_overriding(i, insertion); return cas_result::stored; } - cas_result remote_cas(item_insertion_data& insertion, typename item_type::version_type version) { - return cas(insertion, version); - } - size_t size() { return _cache.size(); } @@ -1022,62 +578,54 @@ public: } cache_stats stats() { - this->_stats._size = size(); - return this->_stats; + _stats._size = size(); + return _stats; } template - std::pair, bool> incr(item_key& key, uint64_t delta) { + std::pair incr(item_key& key, uint64_t delta) { auto i = find(key); if (i == _cache.end()) { - this->_stats._incr_misses++; - return {item_ptr{}, false}; + _stats._incr_misses++; + return {item_ptr{}, false}; } auto& item_ref = *i; - this->_stats._incr_hits++; + _stats._incr_hits++; auto value = item_ref.data_as_integral(); if (!value) { - return {boost::intrusive_ptr(&item_ref), false}; + return {boost::intrusive_ptr(&item_ref), false}; } item_insertion_data insertion { .key = Origin::move_if_local(key), - .ascii_prefix = item_ref._ascii_prefix, + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), .data = to_sstring(*value + delta), .expiry = item_ref._expiry }; i = add_overriding(i, insertion); - return {boost::intrusive_ptr(&*i), true}; - } - - std::pair, bool> remote_incr(item_key& key, uint64_t delta) { - return incr(key, delta); + return {boost::intrusive_ptr(&*i), true}; } template - std::pair, bool> decr(item_key& key, uint64_t delta) { + std::pair decr(item_key& key, uint64_t delta) { auto i = find(key); if (i == _cache.end()) { - this->_stats._decr_misses++; - return {item_ptr{}, false}; + _stats._decr_misses++; + return {item_ptr{}, false}; } auto& item_ref = *i; - this->_stats._decr_hits++; + _stats._decr_hits++; auto value = item_ref.data_as_integral(); if (!value) { - return {boost::intrusive_ptr(&item_ref), false}; + return {boost::intrusive_ptr(&item_ref), false}; } item_insertion_data insertion { .key = Origin::move_if_local(key), - .ascii_prefix = item_ref._ascii_prefix, + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), .data = to_sstring(*value - std::min(*value, delta)), .expiry = item_ref._expiry }; i = add_overriding(i, insertion); - return {boost::intrusive_ptr(&*i), true}; - } - - std::pair, bool> remote_decr(item_key& key, uint64_t delta) { - return decr(key, delta); + return {boost::intrusive_ptr(&*i), true}; } std::pair>> print_hash_stats() { @@ -1124,24 +672,23 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class sharded_cache { private: - distributed>& _peers; + distributed& _peers; inline unsigned get_cpu(const item_key& key) { return std::hash()(key) % smp::count; } public: - sharded_cache(distributed>& peers) : _peers(peers) {} + sharded_cache(distributed& peers) : _peers(peers) {} future<> flush_all() { - return _peers.invoke_on_all(&cache::flush_all); + return _peers.invoke_on_all(&cache::flush_all); } future<> flush_at(clock_type::time_point time_point) { - return _peers.invoke_on_all(&cache::flush_at, time_point); + return _peers.invoke_on_all(&cache::flush_at, time_point); } // The caller must keep @insertion live until the resulting future resolves. @@ -1150,7 +697,7 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().set(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_set, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::set, std::ref(insertion)); } // The caller must keep @insertion live until the resulting future resolves. @@ -1159,7 +706,7 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().add(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_add, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::add, std::ref(insertion)); } // The caller must keep @insertion live until the resulting future resolves. @@ -1168,52 +715,52 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().replace(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_replace, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::replace, std::ref(insertion)); } // The caller must keep @key live until the resulting future resolves. future remove(const item_key& key) { auto cpu = get_cpu(key); - return _peers.invoke_on(cpu, &cache::remove, std::ref(key)); + return _peers.invoke_on(cpu, &cache::remove, std::ref(key)); } // The caller must keep @key live until the resulting future resolves. - future> get(const item_key& key) { + future get(const item_key& key) { auto cpu = get_cpu(key); - return _peers.invoke_on(cpu, &cache::get, std::ref(key)); + return _peers.invoke_on(cpu, &cache::get, std::ref(key)); } // The caller must keep @insertion live until the resulting future resolves. - future cas(item_insertion_data& insertion, typename item::version_type version) { + future cas(item_insertion_data& insertion, item::version_type version) { auto cpu = get_cpu(insertion.key); if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().cas(insertion, version)); } - return _peers.invoke_on(cpu, &cache::remote_cas, std::ref(insertion), std::move(version)); + return _peers.invoke_on(cpu, &cache::cas, std::ref(insertion), std::move(version)); } future stats() { - return _peers.map_reduce(adder(), &cache::stats); + return _peers.map_reduce(adder(), &cache::stats); } // The caller must keep @key live until the resulting future resolves. - future, bool>> incr(item_key& key, uint64_t delta) { + future> incr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); if (engine().cpu_id() == cpu) { - return make_ready_future, bool>>( - _peers.local().incr(key, delta)); + return make_ready_future>( + _peers.local().incr(key, delta)); } - return _peers.invoke_on(cpu, &cache::remote_incr, std::ref(key), std::move(delta)); + return _peers.invoke_on(cpu, &cache::incr, std::ref(key), std::move(delta)); } // The caller must keep @key live until the resulting future resolves. - future, bool>> decr(item_key& key, uint64_t delta) { + future> decr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); if (engine().cpu_id() == cpu) { - return make_ready_future, bool>>( + return make_ready_future>( _peers.local().decr(key, delta)); } - return _peers.invoke_on(cpu, &cache::remote_decr, std::ref(key), std::move(delta)); + return _peers.invoke_on(cpu, &cache::decr, std::ref(key), std::move(delta)); } future<> print_hash_stats(output_stream& out) { @@ -1222,7 +769,7 @@ public: .then([&out, str = std::move(data.second)] { return out.write(*str); }); - }, &cache::print_hash_stats); + }, &cache::print_hash_stats); } }; @@ -1254,18 +801,16 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class ascii_protocol { private: - using this_type = ascii_protocol; - sharded_cache& _cache; + using this_type = ascii_protocol; + sharded_cache& _cache; distributed& _system_stats; memcache_ascii_parser _parser; item_key _item_key; item_insertion_data _insertion; - std::vector> _items; + std::vector _items; private: - static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30; static constexpr const char *msg_crlf = "\r\n"; static constexpr const char *msg_error = "ERROR\r\n"; static constexpr const char *msg_stored = "STORED\r\n"; @@ -1278,10 +823,11 @@ private: static constexpr const char *msg_version = "VERSION " VERSION_STRING "\r\n"; static constexpr const char *msg_exists = "EXISTS\r\n"; static constexpr const char *msg_stat = "STAT "; + static constexpr const char *msg_out_of_memory = "SERVER_ERROR Out of memory allocating new item\r\n"; static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"; private: template - static void append_item(scattered_message& msg, item_ptr item) { + static void append_item(scattered_message& msg, item_ptr item) { if (!item) { return; } @@ -1296,7 +842,7 @@ private: } msg.append_static(msg_crlf); - msg.append_static(item->data()); + msg.append_static(item->value()); msg.append_static(msg_crlf); msg.on_delete([item = std::move(item)] {}); } @@ -1344,75 +890,75 @@ private: auto now = clock_type::now(); auto total_items = all_cache_stats._set_replaces + all_cache_stats._set_adds + all_cache_stats._cas_hits; - return this->print_stat(out, "pid", getpid()) + return print_stat(out, "pid", getpid()) .then([this, now, &out, uptime = now - all_system_stats._start_time] { - return this->print_stat(out, "uptime", + return print_stat(out, "uptime", std::chrono::duration_cast(uptime).count()); }).then([this, now, &out] { - return this->print_stat(out, "time", + return print_stat(out, "time", std::chrono::duration_cast(now.time_since_epoch()).count()); }).then([this, &out] { - return this->print_stat(out, "version", VERSION_STRING); + return print_stat(out, "version", VERSION_STRING); }).then([this, &out] { - return this->print_stat(out, "pointer_size", sizeof(void*)*8); + return print_stat(out, "pointer_size", sizeof(void*)*8); }).then([this, &out, v = all_system_stats._curr_connections] { - return this->print_stat(out, "curr_connections", v); + return print_stat(out, "curr_connections", v); }).then([this, &out, v = all_system_stats._total_connections] { - return this->print_stat(out, "total_connections", v); + return print_stat(out, "total_connections", v); }).then([this, &out, v = all_system_stats._curr_connections] { - return this->print_stat(out, "connection_structures", v); + return print_stat(out, "connection_structures", v); }).then([this, &out, v = all_system_stats._cmd_get] { - return this->print_stat(out, "cmd_get", v); + return print_stat(out, "cmd_get", v); }).then([this, &out, v = all_system_stats._cmd_set] { - return this->print_stat(out, "cmd_set", v); + return print_stat(out, "cmd_set", v); }).then([this, &out, v = all_system_stats._cmd_flush] { - return this->print_stat(out, "cmd_flush", v); + return print_stat(out, "cmd_flush", v); }).then([this, &out] { - return this->print_stat(out, "cmd_touch", 0); + return print_stat(out, "cmd_touch", 0); }).then([this, &out, v = all_cache_stats._get_hits] { - return this->print_stat(out, "get_hits", v); + return print_stat(out, "get_hits", v); }).then([this, &out, v = all_cache_stats._get_misses] { - return this->print_stat(out, "get_misses", v); + return print_stat(out, "get_misses", v); }).then([this, &out, v = all_cache_stats._delete_misses] { - return this->print_stat(out, "delete_misses", v); + return print_stat(out, "delete_misses", v); }).then([this, &out, v = all_cache_stats._delete_hits] { - return this->print_stat(out, "delete_hits", v); + return print_stat(out, "delete_hits", v); }).then([this, &out, v = all_cache_stats._incr_misses] { - return this->print_stat(out, "incr_misses", v); + return print_stat(out, "incr_misses", v); }).then([this, &out, v = all_cache_stats._incr_hits] { - return this->print_stat(out, "incr_hits", v); + return print_stat(out, "incr_hits", v); }).then([this, &out, v = all_cache_stats._decr_misses] { - return this->print_stat(out, "decr_misses", v); + return print_stat(out, "decr_misses", v); }).then([this, &out, v = all_cache_stats._decr_hits] { - return this->print_stat(out, "decr_hits", v); + return print_stat(out, "decr_hits", v); }).then([this, &out, v = all_cache_stats._cas_misses] { - return this->print_stat(out, "cas_misses", v); + return print_stat(out, "cas_misses", v); }).then([this, &out, v = all_cache_stats._cas_hits] { - return this->print_stat(out, "cas_hits", v); + return print_stat(out, "cas_hits", v); }).then([this, &out, v = all_cache_stats._cas_badval] { - return this->print_stat(out, "cas_badval", v); + return print_stat(out, "cas_badval", v); }).then([this, &out] { - return this->print_stat(out, "touch_hits", 0); + return print_stat(out, "touch_hits", 0); }).then([this, &out] { - return this->print_stat(out, "touch_misses", 0); + return print_stat(out, "touch_misses", 0); }).then([this, &out] { - return this->print_stat(out, "auth_cmds", 0); + return print_stat(out, "auth_cmds", 0); }).then([this, &out] { - return this->print_stat(out, "auth_errors", 0); + return print_stat(out, "auth_errors", 0); }).then([this, &out] { - return this->print_stat(out, "threads", smp::count); + return print_stat(out, "threads", smp::count); }).then([this, &out, v = all_cache_stats._size] { - return this->print_stat(out, "curr_items", v); + return print_stat(out, "curr_items", v); }).then([this, &out, v = total_items] { - return this->print_stat(out, "total_items", v); + return print_stat(out, "total_items", v); }).then([this, &out, v = all_cache_stats._expired] { - return this->print_stat(out, "seastar.expired", v); + return print_stat(out, "seastar.expired", v); }).then([this, &out, v = all_cache_stats._resize_failure] { - return this->print_stat(out, "seastar.resize_failure", v); + return print_stat(out, "seastar.resize_failure", v); }).then([this, &out, v = all_cache_stats._evicted] { - return this->print_stat(out, "evictions", v); + return print_stat(out, "evictions", v); }).then([this, &out, v = all_cache_stats._bytes] { - return this->print_stat(out, "bytes", v); + return print_stat(out, "bytes", v); }).then([&out] { return out.write(msg_end); }); @@ -1420,27 +966,17 @@ private: }); } public: - ascii_protocol(sharded_cache& cache, distributed& system_stats) + ascii_protocol(sharded_cache& cache, distributed& system_stats) : _cache(cache) , _system_stats(system_stats) {} - clock_type::time_point seconds_to_time_point(uint32_t seconds) { - if (seconds == 0) { - return clock_type::time_point::max(); - } else if (seconds <= seconds_in_a_month) { - return clock_type::now() + std::chrono::seconds(seconds); - } else { - return clock_type::time_point(std::chrono::seconds(seconds)); - } - } - void prepare_insertion() { _insertion = item_insertion_data{ .key = std::move(_parser._key), .ascii_prefix = make_sstring(" ", _parser._flags_str, " ", _parser._size_str), .data = std::move(_parser._blob), - .expiry = seconds_to_time_point(_parser._expiration) + .expiry = expiration(_parser._expiration) }; } @@ -1536,7 +1072,8 @@ public: { _system_stats.local()._cmd_flush++; if (_parser._expiration) { - auto f = _cache.flush_at(seconds_to_time_point(_parser._expiration)); + auto expiry = expiration(_parser._expiration); + auto f = _cache.flush_at(expiry.to_time_point()); if (_parser._noreply) { return f; } @@ -1578,7 +1115,7 @@ public: if (!incremented) { return out.write(msg_error_non_numeric_value); } - return out.write(item->data()).then([&out] { + return out.write(item->value().data(), item->value_size()).then([&out] { return out.write(msg_crlf); }); }); @@ -1599,23 +1136,34 @@ public: if (!decremented) { return out.write(msg_error_non_numeric_value); } - return out.write(item->data()).then([&out] { + return out.write(item->value().data(), item->value_size()).then([&out] { return out.write(msg_crlf); }); }); } }; std::abort(); + }).rescue([this, &out] (auto get_ex) -> future<> { + // FIXME: rescue being scheduled even though no exception was triggered has a + // performance cost of about 2.6%. Not using it means maintainability penalty. + try { + get_ex(); + } catch (std::bad_alloc& e) { + if (_parser._noreply) { + return make_ready_future<>(); + } + return out.write(msg_out_of_memory); + } + return make_ready_future<>(); }); }; }; -template class udp_server { public: static const size_t default_max_datagram_size = 1400; private: - sharded_cache& _cache; + sharded_cache& _cache; distributed& _system_stats; udp_channel _chan; uint16_t _port; @@ -1639,10 +1187,10 @@ private: input_stream _in; output_stream _out; std::vector _out_bufs; - ascii_protocol _proto; + ascii_protocol _proto; connection(ipv4_addr src, uint16_t request_id, input_stream&& in, size_t out_size, - sharded_cache& c, distributed& system_stats) + sharded_cache& c, distributed& system_stats) : _src(src) , _request_id(request_id) , _in(std::move(in)) @@ -1664,7 +1212,7 @@ private: }; public: - udp_server(sharded_cache& c, distributed& system_stats, uint16_t port = 11211) + udp_server(sharded_cache& c, distributed& system_stats, uint16_t port = 11211) : _cache(c) , _system_stats(system_stats) , _port(port) @@ -1712,11 +1260,10 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class tcp_server { private: lw_shared_ptr _listener; - sharded_cache& _cache; + sharded_cache& _cache; distributed& _system_stats; uint16_t _port; struct connection { @@ -1724,9 +1271,9 @@ private: socket_address _addr; input_stream _in; output_stream _out; - ascii_protocol _proto; + ascii_protocol _proto; distributed& _system_stats; - connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed& system_stats) + connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed& system_stats) : _socket(std::move(socket)) , _addr(addr) , _in(_socket.input()) @@ -1742,7 +1289,7 @@ private: } }; public: - tcp_server(sharded_cache& cache, distributed& system_stats, uint16_t port = 11211) + tcp_server(sharded_cache& cache, distributed& system_stats, uint16_t port = 11211) : _cache(cache) , _system_stats(system_stats) , _port(port) @@ -1767,13 +1314,12 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class stats_printer { private: timer<> _timer; - sharded_cache& _cache; + sharded_cache& _cache; public: - stats_printer(sharded_cache& cache) + stats_printer(sharded_cache& cache) : _cache(cache) {} void start() { @@ -1787,11 +1333,6 @@ public: << std::setprecision(2) << std::fixed << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) " << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%)"; - if (WithFlashCache) { - std::cout << " reclaims: " << stats._reclaims << " " - << "loads: " << stats._loads << " " - << "stores: " << stats._stores << " "; - } std::cout << std::endl; }); }); @@ -1803,26 +1344,23 @@ public: } /* namespace memcache */ -template -int start_instance(int ac, char** av) { - distributed> cache_peers; - memcache::sharded_cache cache(cache_peers); +int main(int ac, char** av) { + distributed cache_peers; + memcache::sharded_cache cache(cache_peers); distributed system_stats; - distributed> udp_server; - distributed> tcp_server; - memcache::stats_printer stats(cache); + distributed udp_server; + distributed tcp_server; + memcache::stats_printer stats(cache); namespace bpo = boost::program_options; app_template app; - if (WithFlashCache) { - app.add_options() - ("device", bpo::value(), - "Flash device") - ; - } app.add_options() - ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), + ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), "Maximum size of UDP datagram") + ("max-slab-size", bpo::value()->default_value(memcache::default_per_cpu_slab_size/MB), + "Maximum memory to be used for items (value in megabytes)") + ("slab-page-size", bpo::value()->default_value(memcache::default_slab_page_size/MB), + "Size of slab page (value in megabytes)") ("stats", "Print basic statistics periodically (every second)") ("port", bpo::value()->default_value(11211), @@ -1837,37 +1375,17 @@ int start_instance(int ac, char** av) { auto&& config = app.configuration(); uint16_t port = config["port"].as(); - return cache_peers.start().then([&system_stats] { + uint64_t per_cpu_slab_size = config["max-slab-size"].as() * MB; + uint64_t slab_page_size = config["slab-page-size"].as() * MB; + return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] { return system_stats.start(clock_type::now()); }).then([&] { - if (WithFlashCache) { - auto device_path = config["device"].as(); - return engine().open_file_dma(device_path).then([&] (file f) { - auto dev = make_lw_shared({std::move(f)}); - return dev->f().stat().then([&, dev] (struct stat st) mutable { - assert(S_ISBLK(st.st_mode)); - return dev->f().size().then([&, dev] (size_t device_size) mutable { - auto per_cpu_device_size = device_size / smp::count; - std::cout << PLATFORM << " flashcached " << VERSION << "\n"; - std::cout << "device size: " << device_size << " bytes\n"; - std::cout << "per-cpu device size: " << per_cpu_device_size << " bytes\n"; - - for (auto cpu = 0U; cpu < smp::count; cpu++) { - auto offset = cpu * per_cpu_device_size; - cache_peers.invoke_on(cpu, &memcache::cache::setup, - make_foreign(dev), std::move(offset), std::move(per_cpu_device_size)); - } - }); - }); - }); - } else { - std::cout << PLATFORM << " memcached " << VERSION << "\n"; - return make_ready_future<>(); - } + std::cout << PLATFORM << " memcached " << VERSION << "\n"; + return make_ready_future<>(); }).then([&, port] { return tcp_server.start(std::ref(cache), std::ref(system_stats), port); }).then([&tcp_server] { - return tcp_server.invoke_on_all(&memcache::tcp_server::start); + return tcp_server.invoke_on_all(&memcache::tcp_server::start); }).then([&, port] { if (engine().net().has_per_core_namespace()) { return udp_server.start(std::ref(cache), std::ref(system_stats), port); @@ -1875,10 +1393,10 @@ int start_instance(int ac, char** av) { return udp_server.start_single(std::ref(cache), std::ref(system_stats), port); } }).then([&] { - return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size, + return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size, (size_t)config["max-datagram-size"].as()); }).then([&] { - return udp_server.invoke_on_all(&memcache::udp_server::start); + return udp_server.invoke_on_all(&memcache::udp_server::start); }).then([&stats, start_stats = config.count("stats")] { if (start_stats) { stats.start(); @@ -1886,11 +1404,3 @@ int start_instance(int ac, char** av) { }); }); } - -int memcache_instance::run(int ac, char** av) { - return start_instance(ac, av); -} - -int memcache_instance::run(int ac, char** av) { - return start_instance(ac, av); -} diff --git a/apps/memcached/memcached.cc b/apps/memcached/memcached.cc deleted file mode 100644 index 1cb3c1b840..0000000000 --- a/apps/memcached/memcached.cc +++ /dev/null @@ -1,30 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2014 Cloudius Systems - * memcached - */ - -#include "memcached.hh" - -int main(int ac, char** av) -{ - constexpr bool WithFlashCache = false; - memcache_instance instance; - return instance.run(ac, av); -} diff --git a/apps/memcached/memcached.hh b/apps/memcached/memcached.hh index b1492a1bfd..54fa217fcc 100644 --- a/apps/memcached/memcached.hh +++ b/apps/memcached/memcached.hh @@ -20,24 +20,9 @@ #include "core/sstring.hh" -template -class memcache_instance; - -template <> -class memcache_instance { -public: - int run(int ac, char** av); -}; - -template <> -class memcache_instance { -public: - int run(int ac, char** av); -}; - namespace memcache { -template +class item; class cache; class item_key { diff --git a/configure.py b/configure.py index 5e411ad36f..2b45f67ade 100755 --- a/configure.py +++ b/configure.py @@ -122,6 +122,7 @@ urchin_tests = [ 'tests/perf/perf_mutation', 'tests/perf/perf_cql_parser', 'tests/urchin/cql_query_test', + 'tests/test-serialization', ] tests = [ @@ -146,7 +147,8 @@ tests = [ 'tests/allocator_test', 'tests/output_stream_test', 'tests/udp_zero_copy', - 'tests/test-serialization' + 'tests/shared_ptr_test', + 'tests/slab_test' ] + urchin_tests apps = [ @@ -154,7 +156,6 @@ apps = [ 'seastar', 'apps/seawreck/seawreck', 'apps/memcached/memcached', - 'apps/memcached/flashcached', ] all_artifacts = apps + tests @@ -187,7 +188,6 @@ libnet = [ 'net/proxy.cc', 'net/virtio.cc', 'net/dpdk.cc', - 'net/net.cc', 'net/ip.cc', 'net/ethernet.cc', 'net/arp.cc', @@ -210,7 +210,8 @@ core = [ 'util/conversions.cc', 'net/packet.cc', 'net/posix-stack.cc', - 'tests/test_runner.cc' + 'tests/test_runner.cc', + 'net/net.cc', ] defines = [] @@ -243,10 +244,6 @@ memcache_base = [ 'apps/memcached/ascii.rl' ] + libnet + core -memcache = [ - 'apps/memcached/memcache.cc', -] + memcache_base - cassandra_interface = Thrift(source = 'interface/cassandra.thrift', service = 'Cassandra') urchin_core = (['database.cc', @@ -294,8 +291,7 @@ deps = { 'seastar': ['main.cc'] + urchin_core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, - 'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache, - 'apps/memcached/flashcached': ['apps/memcached/flashcached.cc'] + memcache, + 'apps/memcached/memcached': ['apps/memcached/memcache.cc'] + memcache_base, 'tests/memcached/test_ascii_parser': ['tests/memcached/test_ascii_parser.cc'] + memcache_base, 'tests/fileiotest': ['tests/fileiotest.cc'] + core, 'tests/directory_test': ['tests/directory_test.cc'] + core, @@ -317,7 +313,8 @@ deps = { 'tests/allocator_test': ['tests/allocator_test.cc', 'core/memory.cc', 'core/posix.cc'], 'tests/output_stream_test': ['tests/output_stream_test.cc'] + core + libnet, 'tests/udp_zero_copy': ['tests/udp_zero_copy.cc'] + core + libnet, - 'tests/test-serialization': ['tests/test-serialization.cc'], + 'tests/shared_ptr_test': ['tests/shared_ptr_test.cc'] + core, + 'tests/slab_test': ['tests/slab_test.cc'] + core, } for t in urchin_tests: diff --git a/core/future.hh b/core/future.hh index 1d2de2b008..067b384c57 100644 --- a/core/future.hh +++ b/core/future.hh @@ -47,6 +47,7 @@ public: }; void schedule(std::unique_ptr t); +void engine_exit(std::exception_ptr eptr = {}); template class lambda_task : public task { @@ -584,7 +585,7 @@ public: try { get(); } catch (...) { - std::terminate(); + engine_exit(std::current_exception()); } }); } diff --git a/core/iostream.hh b/core/iostream.hh index b2eb9061d3..7f0956acac 100644 --- a/core/iostream.hh +++ b/core/iostream.hh @@ -52,8 +52,10 @@ class data_source { protected: data_source_impl* impl() const { return _dsi.get(); } public: + data_source() = default; explicit data_source(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} data_source(data_source&& x) = default; + data_source& operator=(data_source&& x) = default; future> get() { return _dsi->get(); } }; @@ -78,8 +80,10 @@ public: class data_sink { std::unique_ptr _dsi; public: + data_sink() = default; explicit data_sink(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} data_sink(data_sink&& x) = default; + data_sink& operator=(data_sink&& x) = default; future<> put(std::vector> data) { return _dsi->put(std::move(data)); } @@ -113,7 +117,10 @@ public: void operator()(tmp_buf data, Done done); }; using char_type = CharType; + input_stream() = default; explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {} + input_stream(input_stream&&) = default; + input_stream& operator=(input_stream&&) = default; future> read_exactly(size_t n); template future<> consume(Consumer& c); @@ -135,18 +142,21 @@ class output_stream { static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); data_sink _fd; temporary_buffer _buf; - size_t _size; + size_t _size = 0; size_t _begin = 0; size_t _end = 0; - bool _trim_to_size; + bool _trim_to_size = false; private: size_t available() const { return _end - _begin; } size_t possibly_available() const { return _size - _begin; } future<> split_and_put(temporary_buffer buf); public: using char_type = CharType; + output_stream() = default; output_stream(data_sink fd, size_t size, bool trim_to_size = false) : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} + output_stream(output_stream&&) = default; + output_stream& operator=(output_stream&&) = default; future<> write(const char_type* buf, size_t n); future<> write(const char_type* buf); future<> write(const sstring& s); diff --git a/core/posix.hh b/core/posix.hh index 476ec9a24e..7044ca7dde 100644 --- a/core/posix.hh +++ b/core/posix.hh @@ -24,6 +24,7 @@ #include "sstring.hh" #include +#include #include #include #include diff --git a/core/reactor.cc b/core/reactor.cc index 21021128fd..8ce4dff5fc 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -42,6 +42,12 @@ #include #endif #include "prefetch.hh" +#include +#ifdef __GNUC__ +#include +#include +#include +#endif #ifdef HAVE_OSV #include @@ -322,9 +328,9 @@ posix_file_impl::read_dma(uint64_t pos, std::vector iov) { } future -reactor::open_file_dma(sstring name) { - return _thread_pool.submit>([name] { - return wrap_syscall(::open(name.c_str(), O_DIRECT | O_CLOEXEC | O_CREAT | O_RDWR, S_IRWXU)); +reactor::open_file_dma(sstring name, open_flags flags) { + return _thread_pool.submit>([name, flags] { + return wrap_syscall(::open(name.c_str(), O_DIRECT | O_CLOEXEC | static_cast(flags), S_IRWXU)); }).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future(file(sr.result)); @@ -1523,3 +1529,47 @@ reactor_backend_osv::enable_timer(clock_type::time_point when) { } #endif + +/** + * engine_exit() exits the reactor. It should be given a pointer to the + * exception which prompted this exit - or a null pointer if the exit + * request was not caused by any exception. + */ +void engine_exit(std::exception_ptr eptr) { + if (!eptr) { + engine().exit(0); + } +#ifndef __GNUC__ + std::cerr << "Exiting on unhandled exception.\n"; +#else + try { + std::rethrow_exception(eptr); + } catch(...) { + auto tp = abi::__cxa_current_exception_type(); + std::cerr << "Exiting on unhandled exception "; + if (tp) { + int status; + char *demangled = abi::__cxa_demangle(tp->name(), 0, 0, &status); + std::cerr << "of type '"; + if (status == 0) { + std::cerr << demangled; + free(demangled); + } else { + std::cerr << tp->name(); + } + std::cerr << "'.\n"; + } else { + std::cerr << "of unknown type.\n"; + } + // Print more information on some known exception types + try { + throw; + } catch(const std::system_error &e) { + std::cerr << "Error " << e.code() << " (" << e.code().message() << ")\n"; + } catch(const std::exception& e) { + std::cerr << e.what() << "\n"; + } + } +#endif + engine().exit(1); +} diff --git a/core/reactor.hh b/core/reactor.hh index 40a7993d40..4eba12bf4d 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -61,6 +61,7 @@ #include "file.hh" #include "semaphore.hh" #include "core/scattered_message.hh" +#include "core/enum.hh" #ifdef HAVE_OSV #include @@ -228,8 +229,11 @@ public: class connected_socket { std::unique_ptr _csi; public: + connected_socket() {}; explicit connected_socket(std::unique_ptr csi) : _csi(std::move(csi)) {} + connected_socket(connected_socket&& cs) = default; + connected_socket& operator=(connected_socket&& cs) = default; input_stream input(); output_stream output(); }; @@ -595,6 +599,17 @@ public: }; #endif /* HAVE_OSV */ +enum class open_flags { + rw = O_RDWR, + ro = O_RDONLY, + wo = O_WRONLY, + create = O_CREAT, +}; + +inline open_flags operator|(open_flags a, open_flags b) { + return open_flags(static_cast(a) | static_cast(b)); +} + class reactor { private: struct pollfn { @@ -723,7 +738,7 @@ public: future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size); - future open_file_dma(sstring name); + future open_file_dma(sstring name, open_flags flags); future open_directory(sstring name); template diff --git a/core/scattered_message.hh b/core/scattered_message.hh index 1b00e661d5..69292e4329 100644 --- a/core/scattered_message.hh +++ b/core/scattered_message.hh @@ -29,6 +29,7 @@ #include "sstring.hh" #include #include +#include template class scattered_message { @@ -62,6 +63,10 @@ public: append_static(s.begin(), s.size()); } + void append_static(const std::experimental::string_view& s) { + append_static(s.data(), s.size()); + } + template void append(basic_sstring s) { if (s.size()) { diff --git a/core/slab.hh b/core/slab.hh new file mode 100644 index 0000000000..9daca2dfc6 --- /dev/null +++ b/core/slab.hh @@ -0,0 +1,336 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#ifndef __SLAB_ALLOCATOR__ +#define __SLAB_ALLOCATOR__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "core/scollectd.hh" +#include "core/align.hh" + +namespace bi = boost::intrusive; + +/* + * Item requirements + * - Extend it to slab_item_base. + * - First parameter of constructor must be uint8_t _slab_class_id. + * - Implement get_slab_class_id() to return _slab_class_id. + * - Implement is_unlocked() to check if Item can be evicted. + */ + +class slab_item_base { + bi::list_member_hook<> _lru_link; + + template + friend class slab_class; +}; + +template +class slab_class { +private: + std::vector _slab_pages; + std::stack _free_objects; + bi::list, &slab_item_base::_lru_link>> _lru; + size_t _size; // size of objects + uint8_t _slab_class_id; +private: + template + inline Item* create_item(void *object, Args&&... args) { + Item *new_item = new(object) Item(_slab_class_id, std::forward(args)...); + _lru.push_front(reinterpret_cast(*new_item)); + return new_item; + } + + inline void* evict_lru_item(std::function& erase_func) { + if (_lru.empty()) { + return nullptr; + } + + Item& victim = reinterpret_cast(_lru.back()); + assert(victim.is_unlocked()); + _lru.erase(_lru.iterator_to(reinterpret_cast(victim))); + // WARNING: You need to make sure that erase_func will not release victim back to slab. + erase_func(victim); + + return reinterpret_cast(&victim); + } +public: + slab_class(size_t size, uint8_t slab_class_id) + : _size(size) + , _slab_class_id(slab_class_id) + { + } + slab_class(slab_class&&) = default; + ~slab_class() { + _lru.clear(); + for (auto& slab : _slab_pages) { + free(slab); + } + } + + size_t size() const { + return _size; + } + + bool empty() const { + return _free_objects.empty(); + } + + bool has_no_slab_pages() const { + return _slab_pages.empty(); + } + + template + Item *create(Args&&... args) { + assert(!_free_objects.empty()); + auto object = _free_objects.top(); + _free_objects.pop(); + + return create_item(object, std::forward(args)...); + } + + template + Item *create_from_new_page(uint64_t max_object_size, Args&&... args) { + constexpr size_t alignment = std::alignment_of::value; + void *slab_page = aligned_alloc(alignment, max_object_size); + if (!slab_page) { + throw std::bad_alloc{}; + } + _slab_pages.push_back(slab_page); + + assert(_size % alignment == 0); + auto objects = max_object_size / _size; + auto object = reinterpret_cast(slab_page); + for (auto i = 1u; i < objects; i++) { + object += _size; + _free_objects.push(object); + } + + // first object from the allocated slab page is returned. + return create_item(slab_page, std::forward(args)...); + } + + template + Item *create_from_lru(std::function& erase_func, Args&&... args) { + auto victim_object = evict_lru_item(erase_func); + if (!victim_object) { + throw std::bad_alloc{}; + } + return create_item(victim_object, std::forward(args)...); + } + + void free_item(Item *item) { + void *object = item; + _lru.erase(_lru.iterator_to(reinterpret_cast(*item))); + _free_objects.push(object); + } + + void touch_item(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.erase(_lru.iterator_to(item_ref)); + _lru.push_front(item_ref); + } + + void remove_item_from_lru(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.erase(_lru.iterator_to(item_ref)); + } + + void insert_item_into_lru(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.push_front(item_ref); + } +}; + +template +class slab_allocator { +private: + std::vector _slab_class_sizes; + std::vector> _slab_classes; + std::vector _registrations; + std::function _erase_func; + uint64_t _max_object_size; + uint64_t _available_slab_pages; + struct collectd_stats { + uint64_t allocs; + uint64_t frees; + } _stats; +private: + void initialize_slab_classes(double growth_factor, uint64_t limit) { + constexpr size_t alignment = std::alignment_of::value; + constexpr size_t initial_size = 96; + size_t size = initial_size; // initial object size + uint8_t slab_class_id = 0U; + + while (_max_object_size / size > 1) { + size = align_up(size, alignment); + _slab_class_sizes.push_back(size); + _slab_classes.emplace_back(size, slab_class_id); + size *= growth_factor; + assert(slab_class_id < std::numeric_limits::max()); + slab_class_id++; + } + _slab_class_sizes.push_back(_max_object_size); + _slab_classes.emplace_back(_max_object_size, slab_class_id); + } + + slab_class* get_slab_class(const size_t size) { + // given a size, find slab class with binary search. + auto i = std::lower_bound(_slab_class_sizes.begin(), _slab_class_sizes.end(), size); + if (i == _slab_class_sizes.end()) { + return nullptr; + } + auto dist = std::distance(_slab_class_sizes.begin(), i); + return &_slab_classes[dist]; + } + + slab_class* get_slab_class(Item* item) { + auto slab_class_id = item->get_slab_class_id(); + assert(slab_class_id >= 0 && slab_class_id < _slab_classes.size()); + return &_slab_classes[slab_class_id]; + } + + void register_collectd_metrics() { + auto add = [this] (auto type_name, auto name, auto data_type, auto func) { + _registrations.push_back( + scollectd::add_polled_metric(scollectd::type_instance_id("slab", + scollectd::per_cpu_plugin_instance, + type_name, name), + scollectd::make_typed(data_type, func))); + }; + + add("total_operations", "malloc", scollectd::data_type::DERIVE, [&] { return _stats.allocs; }); + add("total_operations", "free", scollectd::data_type::DERIVE, [&] { return _stats.frees; }); + add("objects", "malloc", scollectd::data_type::GAUGE, [&] { return _stats.allocs - _stats.frees; }); + } +public: + slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size) + : _max_object_size(max_object_size) + , _available_slab_pages(limit / max_object_size) + { + initialize_slab_classes(growth_factor, limit); + register_collectd_metrics(); + } + + slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size, std::function erase_func) + : _erase_func(std::move(erase_func)) + , _max_object_size(max_object_size) + , _available_slab_pages(limit / max_object_size) + { + initialize_slab_classes(growth_factor, limit); + register_collectd_metrics(); + } + + ~slab_allocator() + { + _registrations.clear(); + } + + /** + * Create an item from a given slab class based on requested size. + */ + template + Item* create(const size_t size, Args&&... args) { + auto slab_class = get_slab_class(size); + if (!slab_class) { + throw std::bad_alloc{}; + } + + Item *item = nullptr; + if (!slab_class->empty()) { + item = slab_class->create(std::forward(args)...); + _stats.allocs++; + } else { + if (_available_slab_pages > 0 || slab_class->has_no_slab_pages()) { + item = slab_class->create_from_new_page(_max_object_size, std::forward(args)...); + if (_available_slab_pages > 0) { + _available_slab_pages--; + } + _stats.allocs++; + } else if (_erase_func) { + item = slab_class->create_from_lru(_erase_func, std::forward(args)...); + } + } + return item; + } + + void lock_item(Item *item) { + // remove item from the lru of its slab class. + auto slab_class = get_slab_class(item); + slab_class->remove_item_from_lru(item); + } + + void unlock_item(Item *item) { + // insert item into the lru of its slab class. + auto slab_class = get_slab_class(item); + slab_class->insert_item_into_lru(item); + } + + /** + * Free an item back to its original slab class. + */ + void free(Item *item) { + if (item) { + auto slab_class = get_slab_class(item); + slab_class->free_item(item); + _stats.frees++; + } + } + + /** + * Update item position in the LRU of its slab class. + */ + void touch(Item *item) { + if (item) { + auto slab_class = get_slab_class(item); + slab_class->touch_item(item); + } + } + + /** + * Helper function: Print all available slab classes and their respective properties. + */ + void print_slab_classes() { + auto class_id = 0; + for (auto& slab_class : _slab_classes) { + size_t size = slab_class.size(); + printf("slab[%3d]\tsize: %10lu\tper-slab-page: %5lu\n", class_id, size, _max_object_size / size); + class_id++; + } + } + + /** + * Helper function: Useful for getting a slab class' chunk size from a size parameter. + */ + size_t class_size(const size_t size) { + auto slab_class = get_slab_class(size); + return (slab_class) ? slab_class->size() : 0; + } +}; + +#endif /* __SLAB_ALLOCATOR__ */ diff --git a/database.cc b/database.cc index 7c11b273dd..d954c118e8 100644 --- a/database.cc +++ b/database.cc @@ -8,6 +8,8 @@ #include "core/future-util.hh" #include "cql3/column_identifier.hh" +#include +#include thread_local logging::logger dblog("database"); diff --git a/net/api.hh b/net/api.hh index f68652d2b1..d1f75f8035 100644 --- a/net/api.hh +++ b/net/api.hh @@ -33,8 +33,6 @@ #include #include #include -#include -#include class socket_address { public: @@ -58,20 +56,7 @@ struct ipv4_addr { ipv4_addr() : ip(0), port(0) {} ipv4_addr(uint32_t ip, uint16_t port) : ip(ip), port(port) {} ipv4_addr(uint16_t port) : ip(0), port(port) {} - - ipv4_addr(const std::string &addr) { - std::vector items; - boost::split(items, addr, boost::is_any_of(":")); - if (items.size() == 1) { - ip = boost::asio::ip::address_v4::from_string(addr).to_ulong(); - port = 0; - } else if (items.size() == 2) { - ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong(); - port = std::stoul(items[1]); - } else { - throw std::invalid_argument("invalid format: " + addr); - } - } + ipv4_addr(const std::string &addr); ipv4_addr(const socket_address &sa) { ip = net::ntoh(sa.u.in.sin_addr.s_addr); diff --git a/net/net.cc b/net/net.cc index c9039037e2..4a732446fc 100644 --- a/net/net.cc +++ b/net/net.cc @@ -20,12 +20,28 @@ * */ +#include +#include #include "net.hh" #include #include "toeplitz.hh" using std::move; +ipv4_addr::ipv4_addr(const std::string &addr) { + std::vector items; + boost::split(items, addr, boost::is_any_of(":")); + if (items.size() == 1) { + ip = boost::asio::ip::address_v4::from_string(addr).to_ulong(); + port = 0; + } else if (items.size() == 2) { + ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong(); + port = std::stoul(items[1]); + } else { + throw std::invalid_argument("invalid format: " + addr); + } +} + namespace net { inline diff --git a/tests/blkdiscard_test.cc b/tests/blkdiscard_test.cc index b40e65b774..cd0a23a7ac 100644 --- a/tests/blkdiscard_test.cc +++ b/tests/blkdiscard_test.cc @@ -42,7 +42,7 @@ int main(int ac, char** av) { auto&& config = app.configuration(); auto filepath = config["dev"].as(); - engine().open_file_dma(filepath).then([] (file f) { + engine().open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) { auto ft = new file_test{std::move(f)}; ft->f.stat().then([ft] (struct stat st) mutable { diff --git a/tests/fileiotest.cc b/tests/fileiotest.cc index a58910ebcf..6f42e1d0c5 100644 --- a/tests/fileiotest.cc +++ b/tests/fileiotest.cc @@ -31,7 +31,7 @@ struct file_test { int main(int ac, char** av) { static constexpr auto max = 10000; - engine().open_file_dma("testfile.tmp").then([] (file f) { + engine().open_file_dma("testfile.tmp", open_flags::rw | open_flags::create).then([] (file f) { auto ft = new file_test{std::move(f)}; for (size_t i = 0; i < max; ++i) { ft->par.wait().then([ft, i] { diff --git a/tests/linecount.cc b/tests/linecount.cc index e035dbc77c..5f15ed8548 100644 --- a/tests/linecount.cc +++ b/tests/linecount.cc @@ -52,7 +52,7 @@ int main(int ac, char** av) { }); app.run(ac, av, [&app] { auto fname = app.configuration()["file"].as(); - engine().open_file_dma(fname).then([] (file f) { + engine().open_file_dma(fname, open_flags::ro | open_flags::create).then([] (file f) { auto r = make_shared(std::move(f)); r->is.consume(*r).then([r] { print("%d lines\n", r->count); diff --git a/apps/memcached/flashcached.cc b/tests/shared_ptr_test.cc similarity index 55% rename from apps/memcached/flashcached.cc rename to tests/shared_ptr_test.cc index 7d47a82051..876cde6cba 100644 --- a/apps/memcached/flashcached.cc +++ b/tests/shared_ptr_test.cc @@ -15,16 +15,38 @@ * specific language governing permissions and limitations * under the License. */ + /* - * Copyright 2014 Cloudius Systems - * flashcached + * Copyright 2015 Cloudius Systems */ -#include "memcached.hh" +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core -int main(int ac, char** av) -{ - constexpr bool WithFlashCache = true; - memcache_instance instance; - return instance.run(ac, av); +#include +#include "core/shared_ptr.hh" + +struct A { + static bool destroyed; + A() { + destroyed = false; + } + virtual ~A() { + destroyed = true; + } +}; + +struct B { + virtual void x() {} +}; + +bool A::destroyed = false; + +BOOST_AUTO_TEST_CASE(explot_dynamic_cast_use_after_free_problem) { + shared_ptr p = ::make_shared(); + { + auto p2 = dynamic_pointer_cast(p); + BOOST_ASSERT(!p2); + } + BOOST_ASSERT(!A::destroyed); } diff --git a/tests/slab_test.cc b/tests/slab_test.cc new file mode 100644 index 0000000000..6b3aa6003d --- /dev/null +++ b/tests/slab_test.cc @@ -0,0 +1,123 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + * + * To compile: g++ -std=c++14 slab_test.cc + */ + +#include +#include +#include "core/slab.hh" + +static constexpr size_t max_object_size = 1024*1024; + +class item : public slab_item_base { +public: + bi::list_member_hook<> _cache_link; + uint8_t _slab_class_id; + + item(uint8_t slab_class_id) : _slab_class_id(slab_class_id) {} + + const uint8_t get_slab_class_id() { + return _slab_class_id; + } + const bool is_unlocked() { + return true; + } +}; + +template +static void free_vector(slab_allocator& slab, std::vector& items) { + for (auto item : items) { + slab.free(item); + } +} + +static void test_allocation_1(const double growth_factor, const unsigned slab_limit_size) { + slab_allocator slab(growth_factor, slab_limit_size, max_object_size); + size_t size = max_object_size; + + slab.print_slab_classes(); + + std::vector items; + + assert(slab_limit_size % size == 0); + for (auto i = 0u; i < (slab_limit_size / size); i++) { + auto item = slab.create(size); + items.push_back(item); + } + assert(slab.create(size) == nullptr); + + free_vector(slab, items); + std::cout << __FUNCTION__ << " done!\n"; +} + +static void test_allocation_2(const double growth_factor, const unsigned slab_limit_size) { + slab_allocator slab(growth_factor, slab_limit_size, max_object_size); + size_t size = 1024; + + std::vector items; + + auto allocations = 0u; + for (;;) { + auto item = slab.create(size); + if (!item) { + break; + } + items.push_back(item); + allocations++; + } + + auto class_size = slab.class_size(size); + auto per_slab_page = max_object_size / class_size; + auto available_slab_pages = slab_limit_size / max_object_size; + assert(allocations == (per_slab_page * available_slab_pages)); + + free_vector(slab, items); + std::cout << __FUNCTION__ << " done!\n"; +} + +static void test_allocation_with_lru(const double growth_factor, const unsigned slab_limit_size) { + bi::list, &item::_cache_link>> _cache; + unsigned evictions = 0; + + slab_allocator slab(growth_factor, slab_limit_size, max_object_size, + [&](item& item_ref) { _cache.erase(_cache.iterator_to(item_ref)); evictions++; }); + size_t size = max_object_size; + + auto max = slab_limit_size / max_object_size; + for (auto i = 0u; i < max * 1000; i++) { + auto item = slab.create(size); + assert(item != nullptr); + _cache.push_front(*item); + } + assert(evictions == max * 999); + + _cache.clear(); + + std::cout << __FUNCTION__ << " done!\n"; +} + +int main(int ac, char** av) { + test_allocation_1(1.25, 5*1024*1024); + test_allocation_2(1.07, 5*1024*1024); // 1.07 is the growth factor used by facebook. + test_allocation_with_lru(1.25, 5*1024*1024); + + return 0; +} diff --git a/util/serialization.hh b/util/serialization.hh index 689f3b385d..c64fc03923 100644 --- a/util/serialization.hh +++ b/util/serialization.hh @@ -30,12 +30,14 @@ class UTFDataFormatException { }; class EOFException { }; +inline void serialize_bool(std::ostream& out, bool b) { out.put(b ? (char)1 : (char)0); } -constexpr size_t serialize_bool_size = 1; +static constexpr size_t serialize_bool_size = 1; +inline bool deserialize_bool(std::istream& in) { char ret; if (in.get(ret)) { @@ -45,20 +47,24 @@ bool deserialize_bool(std::istream& in) { } } +inline void serialize_int8(std::ostream& out, uint8_t val) { out.put(val); } +inline void serialize_int8(std::ostream& out, int8_t val) { out.put(val); } -constexpr size_t serialize_int8_size = 1; +static constexpr size_t serialize_int8_size = 1; +inline void serialize_int8(std::ostream& out, char val) { out.put(val); } +inline int8_t deserialize_int8(std::istream& in) { char ret; if (in.get(ret)) { @@ -68,15 +74,18 @@ int8_t deserialize_int8(std::istream& in) { } } +inline void serialize_int16(std::ostream& out, uint16_t val) { out.put((char)((val >> 8) & 0xFF)); out.put((char)((val >> 0) & 0xFF)); } +inline void serialize_int16(std::ostream& out, int16_t val) { serialize_int16(out, (uint16_t) val); } +inline int16_t deserialize_int16(std::istream& in) { char a1, a2; in.get(a1); @@ -87,8 +96,9 @@ int16_t deserialize_int16(std::istream& in) { return ((int16_t)(uint8_t)a1 << 8) | ((int16_t)(uint8_t)a2 << 0); } -constexpr size_t serialize_int16_size = 2; +static constexpr size_t serialize_int16_size = 2; +inline void serialize_int32(std::ostream& out, uint32_t val) { out.put((char)((val >> 24) & 0xFF)); out.put((char)((val >> 16) & 0xFF)); @@ -96,12 +106,14 @@ void serialize_int32(std::ostream& out, uint32_t val) { out.put((char)((val >> 0) & 0xFF)); } +inline void serialize_int32(std::ostream& out, int32_t val) { serialize_int32(out, (uint32_t) val); } -constexpr size_t serialize_int32_size = 4; +static constexpr size_t serialize_int32_size = 4; +inline int32_t deserialize_int32(std::istream& in) { char a1, a2, a3, a4; in.get(a1); @@ -114,6 +126,7 @@ int32_t deserialize_int32(std::istream& in) { ((int32_t)(uint8_t)a4 << 0); } +inline void serialize_int64(std::ostream& out, uint64_t val) { out.put((char)((val >> 56) & 0xFF)); out.put((char)((val >> 48) & 0xFF)); @@ -125,12 +138,14 @@ void serialize_int64(std::ostream& out, uint64_t val) { out.put((char)((val >> 0) & 0xFF)); } +inline void serialize_int64(std::ostream& out, int64_t val) { serialize_int64(out, (uint64_t) val); } -constexpr size_t serialize_int64_size = 8; +static constexpr size_t serialize_int64_size = 8; +inline int64_t deserialize_int64(std::istream& in) { char a1, a2, a3, a4, a5, a6, a7, a8; in.get(a1); @@ -159,6 +174,7 @@ int64_t deserialize_int64(std::istream& in) { // http://docs.oracle.com/javase/7/docs/api/java/io/DataInput.html#modified-utf-8) // For now we'll just assume those aren't in the string... // TODO: fix the compatibility with Java even in this case. +inline void serialize_string(std::ostream& out, const sstring& s) { // Java specifies that nulls in the string need to be replaced by the // two bytes 0xC0, 0x80. Let's not bother with such transformation @@ -177,12 +193,14 @@ void serialize_string(std::ostream& out, const sstring& s) { out.write(s.c_str(), s.size()); } +inline size_t serialize_string_size(const sstring& s) {; // As above, this code is missing the case of modified utf-8 return serialize_int16_size + s.size(); } +inline void serialize_string(std::ostream& out, const char *s) { // TODO: like above, need to change UTF-8 when above 16-bit. auto len = strlen(s); @@ -195,6 +213,7 @@ void serialize_string(std::ostream& out, const char *s) { out.write(s, len); } +inline sstring deserialize_string(std::istream& in) { int len = deserialize_int16(in); sstring ret(sstring::initialized_later(), len);