From e0bff0c28ec9292ee3c0b13230ff2df7b8bdedf4 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 25 Feb 2015 10:52:53 +0100 Subject: [PATCH 01/15] tests: Add test exploiting dynamic_pointer_cast bug --- configure.py | 2 ++ tests/shared_ptr_test.cc | 52 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 tests/shared_ptr_test.cc diff --git a/configure.py b/configure.py index 8934ba0914..cef642493c 100755 --- a/configure.py +++ b/configure.py @@ -101,6 +101,7 @@ tests = [ 'tests/allocator_test', 'tests/output_stream_test', 'tests/udp_zero_copy', + 'tests/shared_ptr_test', ] apps = [ @@ -228,6 +229,7 @@ deps = { 'tests/allocator_test': ['tests/allocator_test.cc', 'core/memory.cc', 'core/posix.cc'], 'tests/output_stream_test': ['tests/output_stream_test.cc'] + core + libnet, 'tests/udp_zero_copy': ['tests/udp_zero_copy.cc'] + core + libnet, + 'tests/shared_ptr_test': ['tests/shared_ptr_test.cc'] + core, } warnings = [ diff --git a/tests/shared_ptr_test.cc b/tests/shared_ptr_test.cc new file mode 100644 index 0000000000..876cde6cba --- /dev/null +++ b/tests/shared_ptr_test.cc @@ -0,0 +1,52 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright 2015 Cloudius Systems + */ + +#define BOOST_TEST_DYN_LINK +#define BOOST_TEST_MODULE core + +#include +#include "core/shared_ptr.hh" + +struct A { + static bool destroyed; + A() { + destroyed = false; + } + virtual ~A() { + destroyed = true; + } +}; + +struct B { + virtual void x() {} +}; + +bool A::destroyed = false; + +BOOST_AUTO_TEST_CASE(explot_dynamic_cast_use_after_free_problem) { + shared_ptr p = ::make_shared(); + { + auto p2 = dynamic_pointer_cast(p); + BOOST_ASSERT(!p2); + } + BOOST_ASSERT(!A::destroyed); +} From b64f26832ec7ccc17f0bea5aa6e914b71ee9c7fa Mon Sep 17 00:00:00 2001 From: Nadav Har'El Date: Wed, 25 Feb 2015 11:59:27 +0200 Subject: [PATCH 02/15] reactor: terminate cleanly in or_terminate() The ".or_terminate()" continuation is needed when one needs to exit the application on an unhandled exception, instead of just letting the event loop continue to spin forever. or_terminate() currently calls std::terminate() which only incidentally (as a gcc-specific implementation) prints out the exception's type; It then calls abort(), which results in a painfully slow core dump. This abort() is NOT helpful, because at that point the debugger can only find where abort() was called, not where the original exception was thrown (see issue #32). So instead of calling std::terminate(), this patch switches to calling engine().exit(), to cleanly shut down the application, without dumping core. It also prints, like gcc's std::terminate(), the exception's type. This printing requires non-standard gcc extensions. Signed-off-by: Nadav Har'El --- core/future.hh | 3 ++- core/reactor.cc | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/core/future.hh b/core/future.hh index 1d2de2b008..067b384c57 100644 --- a/core/future.hh +++ b/core/future.hh @@ -47,6 +47,7 @@ public: }; void schedule(std::unique_ptr t); +void engine_exit(std::exception_ptr eptr = {}); template class lambda_task : public task { @@ -584,7 +585,7 @@ public: try { get(); } catch (...) { - std::terminate(); + engine_exit(std::current_exception()); } }); } diff --git a/core/reactor.cc b/core/reactor.cc index 21021128fd..194db7beac 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -42,6 +42,12 @@ #include #endif #include "prefetch.hh" +#include +#ifdef __GNUC__ +#include +#include +#include +#endif #ifdef HAVE_OSV #include @@ -1523,3 +1529,47 @@ reactor_backend_osv::enable_timer(clock_type::time_point when) { } #endif + +/** + * engine_exit() exits the reactor. It should be given a pointer to the + * exception which prompted this exit - or a null pointer if the exit + * request was not caused by any exception. + */ +void engine_exit(std::exception_ptr eptr) { + if (!eptr) { + engine().exit(0); + } +#ifndef __GNUC__ + std::cerr << "Exiting on unhandled exception.\n"; +#else + try { + std::rethrow_exception(eptr); + } catch(...) { + auto tp = abi::__cxa_current_exception_type(); + std::cerr << "Exiting on unhandled exception "; + if (tp) { + int status; + char *demangled = abi::__cxa_demangle(tp->name(), 0, 0, &status); + std::cerr << "of type '"; + if (status == 0) { + std::cerr << demangled; + free(demangled); + } else { + std::cerr << tp->name(); + } + std::cerr << "'.\n"; + } else { + std::cerr << "of unknown type.\n"; + } + // Print more information on some known exception types + try { + throw; + } catch(const std::system_error &e) { + std::cerr << "Error " << e.code() << " (" << e.code().message() << ")\n"; + } catch(const std::exception& e) { + std::cerr << e.what() << "\n"; + } + } +#endif + engine().exit(1); +} From 11f5a8e3982d4036d44665156da3632172e94906 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 25 Feb 2015 12:43:47 +0200 Subject: [PATCH 03/15] add default constructors to iostreams and connected_socket Makes life of a programmer easier since it allows to define variables of those types ahead of object creation. --- core/iostream.hh | 14 ++++++++++++-- core/reactor.hh | 3 +++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/core/iostream.hh b/core/iostream.hh index b2eb9061d3..7f0956acac 100644 --- a/core/iostream.hh +++ b/core/iostream.hh @@ -52,8 +52,10 @@ class data_source { protected: data_source_impl* impl() const { return _dsi.get(); } public: + data_source() = default; explicit data_source(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} data_source(data_source&& x) = default; + data_source& operator=(data_source&& x) = default; future> get() { return _dsi->get(); } }; @@ -78,8 +80,10 @@ public: class data_sink { std::unique_ptr _dsi; public: + data_sink() = default; explicit data_sink(std::unique_ptr dsi) : _dsi(std::move(dsi)) {} data_sink(data_sink&& x) = default; + data_sink& operator=(data_sink&& x) = default; future<> put(std::vector> data) { return _dsi->put(std::move(data)); } @@ -113,7 +117,10 @@ public: void operator()(tmp_buf data, Done done); }; using char_type = CharType; + input_stream() = default; explicit input_stream(data_source fd, size_t buf_size = 8192) : _fd(std::move(fd)), _buf(0) {} + input_stream(input_stream&&) = default; + input_stream& operator=(input_stream&&) = default; future> read_exactly(size_t n); template future<> consume(Consumer& c); @@ -135,18 +142,21 @@ class output_stream { static_assert(sizeof(CharType) == 1, "must buffer stream of bytes"); data_sink _fd; temporary_buffer _buf; - size_t _size; + size_t _size = 0; size_t _begin = 0; size_t _end = 0; - bool _trim_to_size; + bool _trim_to_size = false; private: size_t available() const { return _end - _begin; } size_t possibly_available() const { return _size - _begin; } future<> split_and_put(temporary_buffer buf); public: using char_type = CharType; + output_stream() = default; output_stream(data_sink fd, size_t size, bool trim_to_size = false) : _fd(std::move(fd)), _size(size), _trim_to_size(trim_to_size) {} + output_stream(output_stream&&) = default; + output_stream& operator=(output_stream&&) = default; future<> write(const char_type* buf, size_t n); future<> write(const char_type* buf); future<> write(const sstring& s); diff --git a/core/reactor.hh b/core/reactor.hh index 40a7993d40..1a3c792734 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -228,8 +228,11 @@ public: class connected_socket { std::unique_ptr _csi; public: + connected_socket() {}; explicit connected_socket(std::unique_ptr csi) : _csi(std::move(csi)) {} + connected_socket(connected_socket&& cs) = default; + connected_socket& operator=(connected_socket&& cs) = default; input_stream input(); output_stream output(); }; From b461b42ce81a326506c7d82277a42b25e982d7d7 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 20 Feb 2015 12:49:02 -0200 Subject: [PATCH 04/15] memcached: Revert flashcached Flashcached integration was done relying on certain similarities between flashcached and memcached. Old assumptions no longer hold. As a result, the current code is terrible to integrate the slab allocator into it, while keeping flashcached alive. This patch reverts flashcached from memcached. It should be re-integrated in the future, but definitely in a better way. --- apps/memcached/flashcached.cc | 30 -- apps/memcached/memcache.cc | 891 ++++++---------------------------- apps/memcached/memcached.cc | 30 -- apps/memcached/memcached.hh | 16 - configure.py | 8 +- 5 files changed, 159 insertions(+), 816 deletions(-) delete mode 100644 apps/memcached/flashcached.cc delete mode 100644 apps/memcached/memcached.cc diff --git a/apps/memcached/flashcached.cc b/apps/memcached/flashcached.cc deleted file mode 100644 index 7d47a82051..0000000000 --- a/apps/memcached/flashcached.cc +++ /dev/null @@ -1,30 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2014 Cloudius Systems - * flashcached - */ - -#include "memcached.hh" - -int main(int ac, char** av) -{ - constexpr bool WithFlashCache = true; - memcache_instance instance; - return instance.run(ac, av); -} diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 962c212c55..116aa7ef82 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -41,7 +41,6 @@ #include "core/bitops.hh" #include "memcached.hh" #include -#include #define PLATFORM "seastar" #define VERSION "v1.0" @@ -51,188 +50,14 @@ using namespace net; namespace bi = boost::intrusive; - -namespace flashcache { - -constexpr int block_size_shift = 12; -constexpr uint32_t block_size = 1 << block_size_shift; - -struct block { -private: - uint32_t _blk_id; // granularity: block_size -public: - block() = default; - block(uint32_t blk_id) : _blk_id(blk_id) {} - uint64_t get_addr() { return _blk_id * block_size; } -}; - -struct devfile { -private: - file _f; -public: - devfile(file&& f) : _f(std::move(f)) {} - - file& f() { - return _f; - } - - friend class subdevice; -}; - -class subdevice { - foreign_ptr> _dev; - uint64_t _offset; - uint64_t _end; - std::queue _free_blocks; - semaphore _par = { 1000 }; -public: - subdevice(foreign_ptr> dev, uint64_t offset, uint64_t length) - : _dev(std::move(dev)) - , _offset(offset) - , _end(offset + length) - { - auto blks = length / block_size; - for (auto blk_id = 0U; blk_id < blks; blk_id++) { - _free_blocks.push(blk_id); - } - } - - block allocate(void) { - // FIXME: handle better the case where there is no disk space left for allocations. - assert(!_free_blocks.empty()); - block blk = _free_blocks.front(); - _free_blocks.pop(); - return blk; - } - - void free(block blk) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - // Issue trimming operation on the block being freed. - _dev->_f.discard(actual_blk_addr, block_size).finally([this, blk]() mutable { - _free_blocks.push(blk); - }); - } - - future read(block& blk, void* buffer) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - return _dev->_f.dma_read(actual_blk_addr, buffer, block_size); - } - - future write(block& blk, const void* buffer) { - auto actual_blk_addr = _offset + blk.get_addr(); - assert(actual_blk_addr + block_size <= _end); - return _dev->_f.dma_write(actual_blk_addr, buffer, block_size); - } - - future<> wait() { - return _par.wait(); - } - - void signal() { - _par.signal(); - } -}; - -} /* namespace flashcache */ - namespace memcache { template using optional = boost::optional; -struct memcache_item_base { - memcache_item_base(uint32_t size) {} -}; - -enum class item_state { - MEM, - TO_MEM_DISK, // transition period from MEM to MEM_DISK - MEM_DISK, - DISK, - ERASED, -}; - -struct flashcache_item_base { -private: - item_state _state = item_state::MEM; - uint32_t _size; - // NOTE: vector must be sorted, i.e. first block of data should be in the front of the list. - std::vector _used_blocks; - flashcache::subdevice* _subdev = nullptr; +class item { public: - semaphore _lookup_sem = { 1 }; - - flashcache_item_base(uint32_t size) : _size(size) {} - - ~flashcache_item_base() { - if (_used_blocks.empty()) { - return; - } - assert(_subdev != nullptr); - // Needed to free used blocks only when the underlying item is destroyed, - // otherwise they could be reused while there is I/O in progress to them. - for (auto& blk : _used_blocks) { - _subdev->free(blk); - } - _used_blocks.clear(); - } - - void set_subdevice(flashcache::subdevice* subdev) { - assert(_subdev == nullptr); - _subdev = subdev; - } - - bool is_present() { - return (_state == item_state::MEM || _state == item_state::TO_MEM_DISK || - _state == item_state::MEM_DISK); - } - - item_state get_state() { - return _state; - } - - void set_state(item_state state) { - _state = state; - } - - uint32_t size() { - return _size; - } - - size_t used_blocks_size() { - return _used_blocks.size(); - } - - void used_blocks_clear() { - _used_blocks.clear(); - } - - bool used_blocks_empty() { - return _used_blocks.empty(); - } - - void used_blocks_resize(size_t new_size) { - _used_blocks.resize(new_size); - } - - flashcache::block used_block(unsigned int index) { - assert(index < _used_blocks.size()); - return _used_blocks[index]; - } - - void use_block(unsigned int index, flashcache::block blk) { - assert(index < _used_blocks.size()); - _used_blocks[index] = blk; - } -}; - -template -class item : public std::conditional::type { -public: - using item_type = item; using version_type = uint64_t; using time_point = clock_type::time_point; using duration = clock_type::duration; @@ -240,7 +65,7 @@ private: using hook_type = bi::unordered_set_member_hook<>; // TODO: align shared data to cache line boundary item_key _key; - sstring _data; + const sstring _data; const sstring _ascii_prefix; version_type _version; int _ref_count; @@ -248,14 +73,10 @@ private: bi::list_member_hook<> _lru_link; bi::list_member_hook<> _timer_link; time_point _expiry; - template friend class cache; - friend class memcache_cache_base; - friend class flashcache_cache_base; public: item(item_key&& key, sstring&& ascii_prefix, sstring&& data, clock_type::time_point expiry, version_type version = 1) - : std::conditional::type(data.size()) - , _key(std::move(key)) + : _key(std::move(key)) , _data(std::move(data)) , _ascii_prefix(std::move(ascii_prefix)) , _version(version) @@ -275,7 +96,7 @@ public: return _version; } - sstring& data() { + const sstring& data() { return _data; } @@ -313,42 +134,39 @@ public: return false; } - friend bool operator==(const item_type &a, const item_type &b) { + friend bool operator==(const item &a, const item &b) { return a._key == b._key; } - friend std::size_t hash_value(const item_type &i) { + friend std::size_t hash_value(const item &i) { return std::hash()(i._key); } - friend inline void intrusive_ptr_add_ref(item_type* it) { + friend inline void intrusive_ptr_add_ref(item* it) { ++it->_ref_count; } - friend inline void intrusive_ptr_release(item_type* it) { + friend inline void intrusive_ptr_release(item* it) { if (--it->_ref_count == 0) { delete it; } } - template friend class item_key_cmp; }; -template struct item_key_cmp { - bool operator()(const item_key& key, const item& it) const { + bool operator()(const item_key& key, const item& it) const { return key == it._key; } - bool operator()(const item& it, const item_key& key) const { + bool operator()(const item& it, const item_key& key) const { return key == it._key; } }; -template -using item_ptr = foreign_ptr>>; +using item_ptr = foreign_ptr>; struct cache_stats { size_t _get_hits {}; @@ -370,9 +188,6 @@ struct cache_stats { size_t _resize_failure {}; size_t _size {}; size_t _reclaims{}; - // flashcache-only stats. - size_t _loads{}; - size_t _stores{}; void operator+=(const cache_stats& o) { _get_hits += o._get_hits; @@ -394,8 +209,6 @@ struct cache_stats { _resize_failure += o._resize_failure; _size += o._size; _reclaims += o._reclaims; - _loads += o._loads; - _stores += o._stores; } }; @@ -426,344 +239,39 @@ struct item_insertion_data { clock_type::time_point expiry; }; -struct memcache_cache_base { +class cache { private: - using item_type = item; - using item_lru_list = bi::list, &item_type::_lru_link>>; - item_lru_list _lru; - cache_stats _stats; -public: - void do_setup(foreign_ptr> dev, uint64_t offset, uint64_t length) {} - - void do_erase(item_type& item_ref) { - _lru.erase(_lru.iterator_to(item_ref)); - } - - size_t do_reclaim(size_t target) { - return 0; - } - - future<> do_get(boost::intrusive_ptr item) { - auto& item_ref = *item; - _lru.erase(_lru.iterator_to(item_ref)); - _lru.push_front(item_ref); - return make_ready_future<>(); - } - - void do_set(item_type& new_item_ref) {} - - template - friend class cache; -}; - -struct flashcache_cache_base { -private: - using item_type = item; - using item_lru_list = bi::list, &item_type::_lru_link>>; - item_lru_list _lru; // mem_lru - item_lru_list _mem_disk_lru; - item_lru_list _disk_lru; - uint64_t _total_mem = 0; // total bytes from items' value in mem lru. - uint64_t _total_mem_disk = 0; // total bytes from items' value in mem_disk lru. - std::unique_ptr _subdev; - cache_stats _stats; - - future<> load_item_data(boost::intrusive_ptr item); - future<> store_item_data(boost::intrusive_ptr item); -public: - flashcache::subdevice& get_subdevice() { - auto& subdev_ref = *_subdev.get(); - return subdev_ref; - } - - void do_setup(foreign_ptr> dev, uint64_t offset, uint64_t length) { - _subdev = std::make_unique(std::move(dev), offset, length); - } - - void do_erase(item_type& item_ref) { - switch(item_ref.get_state()) { - case item_state::MEM: - _lru.erase(_lru.iterator_to(item_ref)); - _total_mem -= item_ref.size(); - break; - case item_state::TO_MEM_DISK: - _total_mem_disk -= item_ref.size(); - break; - case item_state::MEM_DISK: - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(item_ref)); - _total_mem_disk -= item_ref.size(); - break; - case item_state::DISK: - _disk_lru.erase(_disk_lru.iterator_to(item_ref)); - break; - default: - assert(0); - } - item_ref.set_state(item_state::ERASED); - } - - size_t do_reclaim(size_t target) { - size_t reclaimed_so_far = 0; - - auto i = this->_mem_disk_lru.end(); - if (i == this->_mem_disk_lru.begin()) { - return 0; - } - - --i; - - bool done = false; - do { - item_type& victim = *i; - if (i != this->_mem_disk_lru.begin()) { - --i; - } else { - done = true; - } - - if (victim._ref_count == 1) { - auto item_data_size = victim.size(); - - assert(victim.data().size() == item_data_size); - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(victim)); - victim.data().reset(); - assert(victim.data().size() == 0); - victim.set_state(item_state::DISK); - _disk_lru.push_front(victim); - reclaimed_so_far += item_data_size; - _total_mem_disk -= item_data_size; - - if (reclaimed_so_far >= target) { - done = true; - } - } - } while (!done); - return reclaimed_so_far; - } - - future<> do_get(boost::intrusive_ptr item) { - return load_item_data(item); - } - - // TODO: Handle storing/loading of zero-length items. - void do_set(item_type& new_item_ref) { - _total_mem += new_item_ref.size(); - new_item_ref.set_subdevice(_subdev.get()); - - // Adjust items between mem (20%) and mem_disk (80%) lru lists. - // With that ratio, items will be constantly scheduled to be stored on disk, - // and that's good because upon memory pressure, we would have enough items - // to satisfy the amount of memory asked to be reclaimed. - if (_total_mem >= 1*MB) { - auto total = _total_mem + _total_mem_disk; - auto total_mem_disk_perc = _total_mem_disk * 100 / total; - if (total_mem_disk_perc < 80) { - // Store least recently used item from lru into mem_disk lru. - item_type& item_ref = _lru.back(); - auto item = boost::intrusive_ptr(&item_ref); - auto item_data_size = item->size(); - - assert(item->get_state() == item_state::MEM); - _lru.erase(_lru.iterator_to(item_ref)); - item->set_state(item_state::TO_MEM_DISK); - store_item_data(item); - _total_mem -= item_data_size; - _total_mem_disk += item_data_size; - } - } - } - - template - friend class cache; -}; - -// -// Load item data from disk into memory. -// NOTE: blocks used aren't freed because item will be moved to _mem_disk_lru. -// -future<> flashcache_cache_base::load_item_data(boost::intrusive_ptr item) { - if (item->is_present()) { - auto& item_ref = *item; - switch(item->get_state()) { - case item_state::MEM: - _lru.erase(_lru.iterator_to(item_ref)); - _lru.push_front(item_ref); - break; - case item_state::TO_MEM_DISK: - break; - case item_state::MEM_DISK: - _mem_disk_lru.erase(_mem_disk_lru.iterator_to(item_ref)); - _mem_disk_lru.push_front(item_ref); - break; - default: - assert(0); - } - return make_ready_future<>(); - } - return item->_lookup_sem.wait().then([this, item] { - if (item->is_present()) { - return make_ready_future<>(); - } - assert(item->get_state() == item_state::DISK); - - flashcache::subdevice& subdev = this->get_subdevice(); - auto sem = make_lw_shared({ 0 }); - auto& item_data = item->data(); - auto item_size = item->size(); - auto blocks_to_load = item->used_blocks_size(); - assert(item_data.empty()); - assert(item_size >= 1); - assert(blocks_to_load == (item_size + (flashcache::block_size - 1)) / flashcache::block_size); - - auto to_read = item_size; - item_data = sstring(sstring::initialized_later(), item_size); - for (auto i = 0U; i < blocks_to_load; ++i) { - auto read_size = std::min(to_read, flashcache::block_size); - - subdev.wait().then([&subdev, sem, item, read_size, i] { - // If the item is already erased no need to schedule new IOs, just signal the semaphores. - if (item->get_state() == item_state::ERASED) { - return make_ready_future<>(); - } - // TODO: Avoid allocation and copying by directly using item's data (should be aligned). - auto rbuf = allocate_aligned_buffer(flashcache::block_size, flashcache::block_size); - auto rb = rbuf.get(); - flashcache::block blk = item->used_block(i); - - return subdev.read(blk, rb).then( - [item, read_size, rbuf = std::move(rbuf), i] (size_t ret) mutable { - assert(ret == flashcache::block_size); - char *data = item->data().begin(); - assert(data != nullptr); - assert((i * flashcache::block_size + read_size) <= item->data().size()); // overflow check - memcpy(data + (i * flashcache::block_size), rbuf.get(), read_size); - }).or_terminate(); - }).finally([&subdev, sem] { - subdev.signal(); - sem->signal(1); - }); - to_read -= read_size; - } - - return sem->wait(blocks_to_load).then([this, item] () mutable { - auto& item_data = item->data(); - auto item_data_size = item_data.size(); - assert(item_data_size == item->size()); - - if (item->get_state() != item_state::ERASED) { - // Adjusting LRU: item is moved from _disk_lru to _mem_disk_lru. - auto& item_ref = *item; - _disk_lru.erase(_disk_lru.iterator_to(item_ref)); - item->set_state(item_state::MEM_DISK); - _mem_disk_lru.push_front(item_ref); - _total_mem_disk += item_data_size; - } - this->_stats._loads++; - }); - }).finally([item] { - item->_lookup_sem.signal(); - }); -} - -// -// Store item data from memory into disk. -// NOTE: Item data remains present in memory. -// -future<> flashcache_cache_base::store_item_data(boost::intrusive_ptr item) { - assert(item->get_state() == item_state::TO_MEM_DISK); - - flashcache::subdevice& subdev = this->get_subdevice(); - auto sem = make_lw_shared({ 0 }); - auto& item_data = item->data(); - auto item_size = item->size(); - auto blocks_to_store = (item_size + (flashcache::block_size - 1)) / flashcache::block_size; - assert(item_data.size() == item_size); - assert(item->used_blocks_empty()); - assert(blocks_to_store >= 1); - - auto to_write = item_size; - item->used_blocks_resize(blocks_to_store); - for (auto i = 0U; i < blocks_to_store; ++i) { - auto write_size = std::min(to_write, flashcache::block_size); - - subdev.wait().then([&subdev, sem, item, write_size, i] { - if (item->get_state() == item_state::ERASED) { - return make_ready_future<>(); - } - auto wbuf = allocate_aligned_buffer(flashcache::block_size, flashcache::block_size); - const char *data = item->data().c_str(); - assert(data != nullptr); - assert((i * flashcache::block_size + write_size) <= item->data().size()); // overflow check - memcpy(wbuf.get(), data + (i * flashcache::block_size), write_size); - auto wb = wbuf.get(); - flashcache::block blk = subdev.allocate(); - item->use_block(i, blk); - - return subdev.write(blk, wb).then([] (size_t ret) mutable { - assert(ret == flashcache::block_size); - }).or_terminate(); - }).finally([&subdev, sem] { - subdev.signal(); - sem->signal(1); - }); - to_write -= write_size; - } - - return sem->wait(blocks_to_store).then([this, item] () mutable { - // NOTE: Item was removed previously from mem lru so as to avoid races, i.e. - // upon another set, the same item would be popped from the back of the lru. - auto& item_data = item->data(); - auto item_data_size = item_data.size(); - assert(item_data_size == item->size()); - - if (item->get_state() != item_state::ERASED) { - // Adjusting LRU: item is moved from mem lru to mem_disk lru. - auto& item_ref = *item; - item->set_state(item_state::MEM_DISK); - _mem_disk_lru.push_front(item_ref); - } - this->_stats._stores++; - }); -} - -template -class cache : public std::conditional::type { -private: - using item_type = item; - using cache_type = bi::unordered_set, + using cache_type = bi::unordered_set, bi::power_2_buckets, bi::constant_time_size>; using cache_iterator = typename cache_type::iterator; - using cache_bucket = typename cache_type::bucket_type; static constexpr size_t initial_bucket_count = 1 << 10; static constexpr float load_factor = 0.75f; size_t _resize_up_threshold = load_factor * initial_bucket_count; - cache_bucket* _buckets; + cache_type::bucket_type* _buckets; cache_type _cache; - timer_set _alive; + bi::list, &item::_lru_link>> _lru; + timer_set _alive; timer<> _timer; + cache_stats _stats; timer<> _flush_timer; memory::reclaimer _reclaimer; private: - size_t item_footprint(item_type& item_ref) { - return sizeof(item_type) + item_ref._data.size() + item_ref._ascii_prefix.size() + item_ref.key().size(); + size_t item_footprint(item& item_ref) { + return sizeof(item) + item_ref._data.size() + item_ref._ascii_prefix.size() + item_ref.key().size(); } template - void erase(item_type& item_ref) { + void erase(item& item_ref) { if (IsInCache) { _cache.erase(_cache.iterator_to(item_ref)); } if (IsInTimerList) { _alive.remove(item_ref); } - this->do_erase(item_ref); - this->_stats._bytes -= item_footprint(item_ref); + _lru.erase(_lru.iterator_to(item_ref)); + _stats._bytes -= item_footprint(item_ref); intrusive_ptr_release(&item_ref); } @@ -773,14 +281,14 @@ private: auto item = &*exp.begin(); exp.pop_front(); erase(*item); - this->_stats._expired++; + _stats._expired++; } _timer.arm(_alive.get_next_timeout()); } inline cache_iterator find(const item_key& key) { - return _cache.find(key, std::hash(), item_key_cmp()); + return _cache.find(key, std::hash(), item_key_cmp()); } template @@ -788,7 +296,7 @@ private: cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) { auto& old_item = *i; - auto new_item = new item_type(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + auto new_item = new item(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), Origin::move_if_local(insertion.data), insertion.expiry, old_item._version + 1); intrusive_ptr_add_ref(new_item); @@ -799,16 +307,15 @@ private: if (_alive.insert(*new_item)) { _timer.rearm(new_item->get_timeout()); } - this->_lru.push_front(*new_item); - this->do_set(*new_item); - this->_stats._bytes += item_footprint(*new_item); + _lru.push_front(*new_item); + _stats._bytes += item_footprint(*new_item); return insert_result.first; } template inline void add_new(item_insertion_data& insertion) { - auto new_item = new item_type(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + auto new_item = new item(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), Origin::move_if_local(insertion.data), insertion.expiry); intrusive_ptr_add_ref(new_item); auto& item_ref = *new_item; @@ -816,9 +323,8 @@ private: if (_alive.insert(item_ref)) { _timer.rearm(item_ref.get_timeout()); } - this->_lru.push_front(*new_item); - this->do_set(*new_item); - this->_stats._bytes += item_footprint(item_ref); + _lru.push_front(*new_item); + _stats._bytes += item_footprint(item_ref); maybe_rehash(); } @@ -827,9 +333,9 @@ private: auto new_size = _cache.bucket_count() * 2; auto old_buckets = _buckets; try { - _buckets = new cache_bucket[new_size]; + _buckets = new cache_type::bucket_type[new_size]; } catch (const std::bad_alloc& e) { - this->_stats._resize_failure++; + _stats._resize_failure++; evict(100); // In order to amortize the cost of resize failure return; } @@ -841,23 +347,18 @@ private: // Evicts at most @count items. void evict(size_t count) { - while (!this->_lru.empty() && count--) { - erase(this->_lru.back()); - this->_stats._evicted++; + while (!_lru.empty() && count--) { + erase(_lru.back()); + _stats._evicted++; } } void reclaim(size_t target) { size_t reclaimed_so_far = 0; - this->_stats._reclaims++; + _stats._reclaims++; - reclaimed_so_far += this->do_reclaim(target); - if (reclaimed_so_far >= target) { - return; - } - - auto i = this->_lru.end(); - if (i == this->_lru.begin()) { + auto i = _lru.end(); + if (i == _lru.begin()) { return; } @@ -865,8 +366,8 @@ private: bool done = false; do { - item_type& victim = *i; - if (i != this->_lru.begin()) { + item& victim = *i; + if (i != _lru.begin()) { --i; } else { done = true; @@ -878,7 +379,7 @@ private: if (victim._ref_count == 1) { reclaimed_so_far += item_footprint(victim); erase(victim); - this->_stats._evicted++; + _stats._evicted++; if (reclaimed_so_far >= target) { done = true; @@ -888,8 +389,8 @@ private: } public: cache() - : _buckets(new cache_bucket[initial_bucket_count]) - , _cache(typename cache_type::bucket_traits(_buckets, initial_bucket_count)) + : _buckets(new cache_type::bucket_type[initial_bucket_count]) + , _cache(cache_type::bucket_traits(_buckets, initial_bucket_count)) , _reclaimer([this] { reclaim(5*MB); }) { _timer.set_callback([this] { expire(); }); @@ -900,14 +401,9 @@ public: flush_all(); } - future<> setup(foreign_ptr> dev, uint64_t offset, uint64_t length) { - this->do_setup(std::move(dev), offset, length); - return make_ready_future<>(); - } - void flush_all() { _flush_timer.cancel(); - _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item_type* it) { + _cache.erase_and_dispose(_cache.begin(), _cache.end(), [this] (item* it) { erase(*it); }); } @@ -921,19 +417,15 @@ public: auto i = find(insertion.key); if (i != _cache.end()) { add_overriding(i, insertion); - this->_stats._set_replaces++; + _stats._set_replaces++; return true; } else { add_new(insertion); - this->_stats._set_adds++; + _stats._set_adds++; return false; } } - bool remote_set(item_insertion_data& insertion) { - return set(insertion); - } - template bool add(item_insertion_data& insertion) { auto i = find(insertion.key); @@ -941,15 +433,11 @@ public: return false; } - this->_stats._set_adds++; + _stats._set_adds++; add_new(insertion); return true; } - bool remote_add(item_insertion_data& insertion) { - return add(insertion); - } - template bool replace(item_insertion_data& insertion) { auto i = find(insertion.key); @@ -957,62 +445,53 @@ public: return false; } - this->_stats._set_replaces++; + _stats._set_replaces++; add_overriding(i, insertion); return true; } - bool remote_replace(item_insertion_data& insertion) { - return replace(insertion); - } - bool remove(const item_key& key) { auto i = find(key); if (i == _cache.end()) { - this->_stats._delete_misses++; + _stats._delete_misses++; return false; } - this->_stats._delete_hits++; + _stats._delete_hits++; auto& item_ref = *i; erase(item_ref); return true; } - future> get(const item_key& key) { + item_ptr get(const item_key& key) { auto i = find(key); if (i == _cache.end()) { - this->_stats._get_misses++; - return make_ready_future>(nullptr); + _stats._get_misses++; + return nullptr; } - this->_stats._get_hits++; + _stats._get_hits++; auto& item_ref = *i; - auto item = boost::intrusive_ptr(&item_ref); - return this->do_get(item).then([item] { - return make_ready_future>(make_foreign(item)); - }); + _lru.erase(_lru.iterator_to(item_ref)); + _lru.push_front(item_ref); + return item_ptr(&item_ref); } template - cas_result cas(item_insertion_data& insertion, typename item_type::version_type version) { + cas_result cas(item_insertion_data& insertion, item::version_type version) { auto i = find(insertion.key); if (i == _cache.end()) { - this->_stats._cas_misses++; + _stats._cas_misses++; return cas_result::not_found; } auto& item_ref = *i; if (item_ref._version != version) { - this->_stats._cas_badval++; + _stats._cas_badval++; return cas_result::bad_version; } - this->_stats._cas_hits++; + _stats._cas_hits++; add_overriding(i, insertion); return cas_result::stored; } - cas_result remote_cas(item_insertion_data& insertion, typename item_type::version_type version) { - return cas(insertion, version); - } - size_t size() { return _cache.size(); } @@ -1022,22 +501,22 @@ public: } cache_stats stats() { - this->_stats._size = size(); - return this->_stats; + _stats._size = size(); + return _stats; } template - std::pair, bool> incr(item_key& key, uint64_t delta) { + std::pair incr(item_key& key, uint64_t delta) { auto i = find(key); if (i == _cache.end()) { - this->_stats._incr_misses++; - return {item_ptr{}, false}; + _stats._incr_misses++; + return {item_ptr{}, false}; } auto& item_ref = *i; - this->_stats._incr_hits++; + _stats._incr_hits++; auto value = item_ref.data_as_integral(); if (!value) { - return {boost::intrusive_ptr(&item_ref), false}; + return {boost::intrusive_ptr(&item_ref), false}; } item_insertion_data insertion { .key = Origin::move_if_local(key), @@ -1046,25 +525,21 @@ public: .expiry = item_ref._expiry }; i = add_overriding(i, insertion); - return {boost::intrusive_ptr(&*i), true}; - } - - std::pair, bool> remote_incr(item_key& key, uint64_t delta) { - return incr(key, delta); + return {boost::intrusive_ptr(&*i), true}; } template - std::pair, bool> decr(item_key& key, uint64_t delta) { + std::pair decr(item_key& key, uint64_t delta) { auto i = find(key); if (i == _cache.end()) { - this->_stats._decr_misses++; - return {item_ptr{}, false}; + _stats._decr_misses++; + return {item_ptr{}, false}; } auto& item_ref = *i; - this->_stats._decr_hits++; + _stats._decr_hits++; auto value = item_ref.data_as_integral(); if (!value) { - return {boost::intrusive_ptr(&item_ref), false}; + return {boost::intrusive_ptr(&item_ref), false}; } item_insertion_data insertion { .key = Origin::move_if_local(key), @@ -1073,11 +548,7 @@ public: .expiry = item_ref._expiry }; i = add_overriding(i, insertion); - return {boost::intrusive_ptr(&*i), true}; - } - - std::pair, bool> remote_decr(item_key& key, uint64_t delta) { - return decr(key, delta); + return {boost::intrusive_ptr(&*i), true}; } std::pair>> print_hash_stats() { @@ -1124,24 +595,23 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class sharded_cache { private: - distributed>& _peers; + distributed& _peers; inline unsigned get_cpu(const item_key& key) { return std::hash()(key) % smp::count; } public: - sharded_cache(distributed>& peers) : _peers(peers) {} + sharded_cache(distributed& peers) : _peers(peers) {} future<> flush_all() { - return _peers.invoke_on_all(&cache::flush_all); + return _peers.invoke_on_all(&cache::flush_all); } future<> flush_at(clock_type::time_point time_point) { - return _peers.invoke_on_all(&cache::flush_at, time_point); + return _peers.invoke_on_all(&cache::flush_at, time_point); } // The caller must keep @insertion live until the resulting future resolves. @@ -1150,7 +620,7 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().set(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_set, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::set, std::ref(insertion)); } // The caller must keep @insertion live until the resulting future resolves. @@ -1159,7 +629,7 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().add(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_add, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::add, std::ref(insertion)); } // The caller must keep @insertion live until the resulting future resolves. @@ -1168,52 +638,52 @@ public: if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().replace(insertion)); } - return _peers.invoke_on(cpu, &cache::remote_replace, std::ref(insertion)); + return _peers.invoke_on(cpu, &cache::replace, std::ref(insertion)); } // The caller must keep @key live until the resulting future resolves. future remove(const item_key& key) { auto cpu = get_cpu(key); - return _peers.invoke_on(cpu, &cache::remove, std::ref(key)); + return _peers.invoke_on(cpu, &cache::remove, std::ref(key)); } // The caller must keep @key live until the resulting future resolves. - future> get(const item_key& key) { + future get(const item_key& key) { auto cpu = get_cpu(key); - return _peers.invoke_on(cpu, &cache::get, std::ref(key)); + return _peers.invoke_on(cpu, &cache::get, std::ref(key)); } // The caller must keep @insertion live until the resulting future resolves. - future cas(item_insertion_data& insertion, typename item::version_type version) { + future cas(item_insertion_data& insertion, item::version_type version) { auto cpu = get_cpu(insertion.key); if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().cas(insertion, version)); } - return _peers.invoke_on(cpu, &cache::remote_cas, std::ref(insertion), std::move(version)); + return _peers.invoke_on(cpu, &cache::cas, std::ref(insertion), std::move(version)); } future stats() { - return _peers.map_reduce(adder(), &cache::stats); + return _peers.map_reduce(adder(), &cache::stats); } // The caller must keep @key live until the resulting future resolves. - future, bool>> incr(item_key& key, uint64_t delta) { + future> incr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); if (engine().cpu_id() == cpu) { - return make_ready_future, bool>>( - _peers.local().incr(key, delta)); + return make_ready_future>( + _peers.local().incr(key, delta)); } - return _peers.invoke_on(cpu, &cache::remote_incr, std::ref(key), std::move(delta)); + return _peers.invoke_on(cpu, &cache::incr, std::ref(key), std::move(delta)); } // The caller must keep @key live until the resulting future resolves. - future, bool>> decr(item_key& key, uint64_t delta) { + future> decr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); if (engine().cpu_id() == cpu) { - return make_ready_future, bool>>( + return make_ready_future>( _peers.local().decr(key, delta)); } - return _peers.invoke_on(cpu, &cache::remote_decr, std::ref(key), std::move(delta)); + return _peers.invoke_on(cpu, &cache::decr, std::ref(key), std::move(delta)); } future<> print_hash_stats(output_stream& out) { @@ -1222,7 +692,7 @@ public: .then([&out, str = std::move(data.second)] { return out.write(*str); }); - }, &cache::print_hash_stats); + }, &cache::print_hash_stats); } }; @@ -1254,16 +724,15 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class ascii_protocol { private: - using this_type = ascii_protocol; - sharded_cache& _cache; + using this_type = ascii_protocol; + sharded_cache& _cache; distributed& _system_stats; memcache_ascii_parser _parser; item_key _item_key; item_insertion_data _insertion; - std::vector> _items; + std::vector _items; private: static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30; static constexpr const char *msg_crlf = "\r\n"; @@ -1281,7 +750,7 @@ private: static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"; private: template - static void append_item(scattered_message& msg, item_ptr item) { + static void append_item(scattered_message& msg, item_ptr item) { if (!item) { return; } @@ -1344,75 +813,75 @@ private: auto now = clock_type::now(); auto total_items = all_cache_stats._set_replaces + all_cache_stats._set_adds + all_cache_stats._cas_hits; - return this->print_stat(out, "pid", getpid()) + return print_stat(out, "pid", getpid()) .then([this, now, &out, uptime = now - all_system_stats._start_time] { - return this->print_stat(out, "uptime", + return print_stat(out, "uptime", std::chrono::duration_cast(uptime).count()); }).then([this, now, &out] { - return this->print_stat(out, "time", + return print_stat(out, "time", std::chrono::duration_cast(now.time_since_epoch()).count()); }).then([this, &out] { - return this->print_stat(out, "version", VERSION_STRING); + return print_stat(out, "version", VERSION_STRING); }).then([this, &out] { - return this->print_stat(out, "pointer_size", sizeof(void*)*8); + return print_stat(out, "pointer_size", sizeof(void*)*8); }).then([this, &out, v = all_system_stats._curr_connections] { - return this->print_stat(out, "curr_connections", v); + return print_stat(out, "curr_connections", v); }).then([this, &out, v = all_system_stats._total_connections] { - return this->print_stat(out, "total_connections", v); + return print_stat(out, "total_connections", v); }).then([this, &out, v = all_system_stats._curr_connections] { - return this->print_stat(out, "connection_structures", v); + return print_stat(out, "connection_structures", v); }).then([this, &out, v = all_system_stats._cmd_get] { - return this->print_stat(out, "cmd_get", v); + return print_stat(out, "cmd_get", v); }).then([this, &out, v = all_system_stats._cmd_set] { - return this->print_stat(out, "cmd_set", v); + return print_stat(out, "cmd_set", v); }).then([this, &out, v = all_system_stats._cmd_flush] { - return this->print_stat(out, "cmd_flush", v); + return print_stat(out, "cmd_flush", v); }).then([this, &out] { - return this->print_stat(out, "cmd_touch", 0); + return print_stat(out, "cmd_touch", 0); }).then([this, &out, v = all_cache_stats._get_hits] { - return this->print_stat(out, "get_hits", v); + return print_stat(out, "get_hits", v); }).then([this, &out, v = all_cache_stats._get_misses] { - return this->print_stat(out, "get_misses", v); + return print_stat(out, "get_misses", v); }).then([this, &out, v = all_cache_stats._delete_misses] { - return this->print_stat(out, "delete_misses", v); + return print_stat(out, "delete_misses", v); }).then([this, &out, v = all_cache_stats._delete_hits] { - return this->print_stat(out, "delete_hits", v); + return print_stat(out, "delete_hits", v); }).then([this, &out, v = all_cache_stats._incr_misses] { - return this->print_stat(out, "incr_misses", v); + return print_stat(out, "incr_misses", v); }).then([this, &out, v = all_cache_stats._incr_hits] { - return this->print_stat(out, "incr_hits", v); + return print_stat(out, "incr_hits", v); }).then([this, &out, v = all_cache_stats._decr_misses] { - return this->print_stat(out, "decr_misses", v); + return print_stat(out, "decr_misses", v); }).then([this, &out, v = all_cache_stats._decr_hits] { - return this->print_stat(out, "decr_hits", v); + return print_stat(out, "decr_hits", v); }).then([this, &out, v = all_cache_stats._cas_misses] { - return this->print_stat(out, "cas_misses", v); + return print_stat(out, "cas_misses", v); }).then([this, &out, v = all_cache_stats._cas_hits] { - return this->print_stat(out, "cas_hits", v); + return print_stat(out, "cas_hits", v); }).then([this, &out, v = all_cache_stats._cas_badval] { - return this->print_stat(out, "cas_badval", v); + return print_stat(out, "cas_badval", v); }).then([this, &out] { - return this->print_stat(out, "touch_hits", 0); + return print_stat(out, "touch_hits", 0); }).then([this, &out] { - return this->print_stat(out, "touch_misses", 0); + return print_stat(out, "touch_misses", 0); }).then([this, &out] { - return this->print_stat(out, "auth_cmds", 0); + return print_stat(out, "auth_cmds", 0); }).then([this, &out] { - return this->print_stat(out, "auth_errors", 0); + return print_stat(out, "auth_errors", 0); }).then([this, &out] { - return this->print_stat(out, "threads", smp::count); + return print_stat(out, "threads", smp::count); }).then([this, &out, v = all_cache_stats._size] { - return this->print_stat(out, "curr_items", v); + return print_stat(out, "curr_items", v); }).then([this, &out, v = total_items] { - return this->print_stat(out, "total_items", v); + return print_stat(out, "total_items", v); }).then([this, &out, v = all_cache_stats._expired] { - return this->print_stat(out, "seastar.expired", v); + return print_stat(out, "seastar.expired", v); }).then([this, &out, v = all_cache_stats._resize_failure] { - return this->print_stat(out, "seastar.resize_failure", v); + return print_stat(out, "seastar.resize_failure", v); }).then([this, &out, v = all_cache_stats._evicted] { - return this->print_stat(out, "evictions", v); + return print_stat(out, "evictions", v); }).then([this, &out, v = all_cache_stats._bytes] { - return this->print_stat(out, "bytes", v); + return print_stat(out, "bytes", v); }).then([&out] { return out.write(msg_end); }); @@ -1420,7 +889,7 @@ private: }); } public: - ascii_protocol(sharded_cache& cache, distributed& system_stats) + ascii_protocol(sharded_cache& cache, distributed& system_stats) : _cache(cache) , _system_stats(system_stats) {} @@ -1610,12 +1079,11 @@ public: }; }; -template class udp_server { public: static const size_t default_max_datagram_size = 1400; private: - sharded_cache& _cache; + sharded_cache& _cache; distributed& _system_stats; udp_channel _chan; uint16_t _port; @@ -1639,10 +1107,10 @@ private: input_stream _in; output_stream _out; std::vector _out_bufs; - ascii_protocol _proto; + ascii_protocol _proto; connection(ipv4_addr src, uint16_t request_id, input_stream&& in, size_t out_size, - sharded_cache& c, distributed& system_stats) + sharded_cache& c, distributed& system_stats) : _src(src) , _request_id(request_id) , _in(std::move(in)) @@ -1664,7 +1132,7 @@ private: }; public: - udp_server(sharded_cache& c, distributed& system_stats, uint16_t port = 11211) + udp_server(sharded_cache& c, distributed& system_stats, uint16_t port = 11211) : _cache(c) , _system_stats(system_stats) , _port(port) @@ -1712,11 +1180,10 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class tcp_server { private: lw_shared_ptr _listener; - sharded_cache& _cache; + sharded_cache& _cache; distributed& _system_stats; uint16_t _port; struct connection { @@ -1724,9 +1191,9 @@ private: socket_address _addr; input_stream _in; output_stream _out; - ascii_protocol _proto; + ascii_protocol _proto; distributed& _system_stats; - connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed& system_stats) + connection(connected_socket&& socket, socket_address addr, sharded_cache& c, distributed& system_stats) : _socket(std::move(socket)) , _addr(addr) , _in(_socket.input()) @@ -1742,7 +1209,7 @@ private: } }; public: - tcp_server(sharded_cache& cache, distributed& system_stats, uint16_t port = 11211) + tcp_server(sharded_cache& cache, distributed& system_stats, uint16_t port = 11211) : _cache(cache) , _system_stats(system_stats) , _port(port) @@ -1767,13 +1234,12 @@ public: future<> stop() { return make_ready_future<>(); } }; -template class stats_printer { private: timer<> _timer; - sharded_cache& _cache; + sharded_cache& _cache; public: - stats_printer(sharded_cache& cache) + stats_printer(sharded_cache& cache) : _cache(cache) {} void start() { @@ -1787,11 +1253,6 @@ public: << std::setprecision(2) << std::fixed << "get: " << stats._get_hits << "/" << gets_total << " (" << get_hit_rate << "%) " << "set: " << stats._set_replaces << "/" << sets_total << " (" << set_replace_rate << "%)"; - if (WithFlashCache) { - std::cout << " reclaims: " << stats._reclaims << " " - << "loads: " << stats._loads << " " - << "stores: " << stats._stores << " "; - } std::cout << std::endl; }); }); @@ -1803,25 +1264,18 @@ public: } /* namespace memcache */ -template -int start_instance(int ac, char** av) { - distributed> cache_peers; - memcache::sharded_cache cache(cache_peers); +int main(int ac, char** av) { + distributed cache_peers; + memcache::sharded_cache cache(cache_peers); distributed system_stats; - distributed> udp_server; - distributed> tcp_server; - memcache::stats_printer stats(cache); + distributed udp_server; + distributed tcp_server; + memcache::stats_printer stats(cache); namespace bpo = boost::program_options; app_template app; - if (WithFlashCache) { - app.add_options() - ("device", bpo::value(), - "Flash device") - ; - } app.add_options() - ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), + ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), "Maximum size of UDP datagram") ("stats", "Print basic statistics periodically (every second)") @@ -1840,34 +1294,12 @@ int start_instance(int ac, char** av) { return cache_peers.start().then([&system_stats] { return system_stats.start(clock_type::now()); }).then([&] { - if (WithFlashCache) { - auto device_path = config["device"].as(); - return engine().open_file_dma(device_path).then([&] (file f) { - auto dev = make_lw_shared({std::move(f)}); - return dev->f().stat().then([&, dev] (struct stat st) mutable { - assert(S_ISBLK(st.st_mode)); - return dev->f().size().then([&, dev] (size_t device_size) mutable { - auto per_cpu_device_size = device_size / smp::count; - std::cout << PLATFORM << " flashcached " << VERSION << "\n"; - std::cout << "device size: " << device_size << " bytes\n"; - std::cout << "per-cpu device size: " << per_cpu_device_size << " bytes\n"; - - for (auto cpu = 0U; cpu < smp::count; cpu++) { - auto offset = cpu * per_cpu_device_size; - cache_peers.invoke_on(cpu, &memcache::cache::setup, - make_foreign(dev), std::move(offset), std::move(per_cpu_device_size)); - } - }); - }); - }); - } else { - std::cout << PLATFORM << " memcached " << VERSION << "\n"; - return make_ready_future<>(); - } + std::cout << PLATFORM << " memcached " << VERSION << "\n"; + return make_ready_future<>(); }).then([&, port] { return tcp_server.start(std::ref(cache), std::ref(system_stats), port); }).then([&tcp_server] { - return tcp_server.invoke_on_all(&memcache::tcp_server::start); + return tcp_server.invoke_on_all(&memcache::tcp_server::start); }).then([&, port] { if (engine().net().has_per_core_namespace()) { return udp_server.start(std::ref(cache), std::ref(system_stats), port); @@ -1875,10 +1307,10 @@ int start_instance(int ac, char** av) { return udp_server.start_single(std::ref(cache), std::ref(system_stats), port); } }).then([&] { - return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size, + return udp_server.invoke_on_all(&memcache::udp_server::set_max_datagram_size, (size_t)config["max-datagram-size"].as()); }).then([&] { - return udp_server.invoke_on_all(&memcache::udp_server::start); + return udp_server.invoke_on_all(&memcache::udp_server::start); }).then([&stats, start_stats = config.count("stats")] { if (start_stats) { stats.start(); @@ -1887,10 +1319,3 @@ int start_instance(int ac, char** av) { }); } -int memcache_instance::run(int ac, char** av) { - return start_instance(ac, av); -} - -int memcache_instance::run(int ac, char** av) { - return start_instance(ac, av); -} diff --git a/apps/memcached/memcached.cc b/apps/memcached/memcached.cc deleted file mode 100644 index 1cb3c1b840..0000000000 --- a/apps/memcached/memcached.cc +++ /dev/null @@ -1,30 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2014 Cloudius Systems - * memcached - */ - -#include "memcached.hh" - -int main(int ac, char** av) -{ - constexpr bool WithFlashCache = false; - memcache_instance instance; - return instance.run(ac, av); -} diff --git a/apps/memcached/memcached.hh b/apps/memcached/memcached.hh index b1492a1bfd..c74198627a 100644 --- a/apps/memcached/memcached.hh +++ b/apps/memcached/memcached.hh @@ -20,24 +20,8 @@ #include "core/sstring.hh" -template -class memcache_instance; - -template <> -class memcache_instance { -public: - int run(int ac, char** av); -}; - -template <> -class memcache_instance { -public: - int run(int ac, char** av); -}; - namespace memcache { -template class cache; class item_key { diff --git a/configure.py b/configure.py index cef642493c..eb86aa27ee 100755 --- a/configure.py +++ b/configure.py @@ -109,7 +109,6 @@ apps = [ 'apps/seawreck/seawreck', 'apps/seastar/seastar', 'apps/memcached/memcached', - 'apps/memcached/flashcached', ] all_artifacts = apps + tests @@ -198,16 +197,11 @@ memcache_base = [ 'apps/memcached/ascii.rl' ] + libnet + core -memcache = [ - 'apps/memcached/memcache.cc', -] + memcache_base - deps = { 'apps/seastar/seastar': ['apps/seastar/main.cc'] + core, 'tests/test-reactor': ['tests/test-reactor.cc'] + core, 'apps/httpd/httpd': ['apps/httpd/httpd.cc', 'apps/httpd/request_parser.rl'] + libnet + core, - 'apps/memcached/memcached': ['apps/memcached/memcached.cc'] + memcache, - 'apps/memcached/flashcached': ['apps/memcached/flashcached.cc'] + memcache, + 'apps/memcached/memcached': ['apps/memcached/memcache.cc'] + memcache_base, 'tests/memcached/test_ascii_parser': ['tests/memcached/test_ascii_parser.cc'] + memcache_base, 'tests/fileiotest': ['tests/fileiotest.cc'] + core, 'tests/directory_test': ['tests/directory_test.cc'] + core, From 03ae698ce17a138e58039caaa64bdb8904c9f422 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 16 Feb 2015 15:33:21 -0200 Subject: [PATCH 05/15] core: Add slab allocator * Slab allocator resembles the one used by stock memcached, where it's composed of slab classes and slab classes are composed of chunks of the same size. * Per-slab-class LRU is also available. * Slab allocator exports stats to collectd. Signed-off-by: Raphael S. Carvalho --- core/slab.hh | 336 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 336 insertions(+) create mode 100644 core/slab.hh diff --git a/core/slab.hh b/core/slab.hh new file mode 100644 index 0000000000..9daca2dfc6 --- /dev/null +++ b/core/slab.hh @@ -0,0 +1,336 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ +#ifndef __SLAB_ALLOCATOR__ +#define __SLAB_ALLOCATOR__ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "core/scollectd.hh" +#include "core/align.hh" + +namespace bi = boost::intrusive; + +/* + * Item requirements + * - Extend it to slab_item_base. + * - First parameter of constructor must be uint8_t _slab_class_id. + * - Implement get_slab_class_id() to return _slab_class_id. + * - Implement is_unlocked() to check if Item can be evicted. + */ + +class slab_item_base { + bi::list_member_hook<> _lru_link; + + template + friend class slab_class; +}; + +template +class slab_class { +private: + std::vector _slab_pages; + std::stack _free_objects; + bi::list, &slab_item_base::_lru_link>> _lru; + size_t _size; // size of objects + uint8_t _slab_class_id; +private: + template + inline Item* create_item(void *object, Args&&... args) { + Item *new_item = new(object) Item(_slab_class_id, std::forward(args)...); + _lru.push_front(reinterpret_cast(*new_item)); + return new_item; + } + + inline void* evict_lru_item(std::function& erase_func) { + if (_lru.empty()) { + return nullptr; + } + + Item& victim = reinterpret_cast(_lru.back()); + assert(victim.is_unlocked()); + _lru.erase(_lru.iterator_to(reinterpret_cast(victim))); + // WARNING: You need to make sure that erase_func will not release victim back to slab. + erase_func(victim); + + return reinterpret_cast(&victim); + } +public: + slab_class(size_t size, uint8_t slab_class_id) + : _size(size) + , _slab_class_id(slab_class_id) + { + } + slab_class(slab_class&&) = default; + ~slab_class() { + _lru.clear(); + for (auto& slab : _slab_pages) { + free(slab); + } + } + + size_t size() const { + return _size; + } + + bool empty() const { + return _free_objects.empty(); + } + + bool has_no_slab_pages() const { + return _slab_pages.empty(); + } + + template + Item *create(Args&&... args) { + assert(!_free_objects.empty()); + auto object = _free_objects.top(); + _free_objects.pop(); + + return create_item(object, std::forward(args)...); + } + + template + Item *create_from_new_page(uint64_t max_object_size, Args&&... args) { + constexpr size_t alignment = std::alignment_of::value; + void *slab_page = aligned_alloc(alignment, max_object_size); + if (!slab_page) { + throw std::bad_alloc{}; + } + _slab_pages.push_back(slab_page); + + assert(_size % alignment == 0); + auto objects = max_object_size / _size; + auto object = reinterpret_cast(slab_page); + for (auto i = 1u; i < objects; i++) { + object += _size; + _free_objects.push(object); + } + + // first object from the allocated slab page is returned. + return create_item(slab_page, std::forward(args)...); + } + + template + Item *create_from_lru(std::function& erase_func, Args&&... args) { + auto victim_object = evict_lru_item(erase_func); + if (!victim_object) { + throw std::bad_alloc{}; + } + return create_item(victim_object, std::forward(args)...); + } + + void free_item(Item *item) { + void *object = item; + _lru.erase(_lru.iterator_to(reinterpret_cast(*item))); + _free_objects.push(object); + } + + void touch_item(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.erase(_lru.iterator_to(item_ref)); + _lru.push_front(item_ref); + } + + void remove_item_from_lru(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.erase(_lru.iterator_to(item_ref)); + } + + void insert_item_into_lru(Item *item) { + auto& item_ref = reinterpret_cast(*item); + _lru.push_front(item_ref); + } +}; + +template +class slab_allocator { +private: + std::vector _slab_class_sizes; + std::vector> _slab_classes; + std::vector _registrations; + std::function _erase_func; + uint64_t _max_object_size; + uint64_t _available_slab_pages; + struct collectd_stats { + uint64_t allocs; + uint64_t frees; + } _stats; +private: + void initialize_slab_classes(double growth_factor, uint64_t limit) { + constexpr size_t alignment = std::alignment_of::value; + constexpr size_t initial_size = 96; + size_t size = initial_size; // initial object size + uint8_t slab_class_id = 0U; + + while (_max_object_size / size > 1) { + size = align_up(size, alignment); + _slab_class_sizes.push_back(size); + _slab_classes.emplace_back(size, slab_class_id); + size *= growth_factor; + assert(slab_class_id < std::numeric_limits::max()); + slab_class_id++; + } + _slab_class_sizes.push_back(_max_object_size); + _slab_classes.emplace_back(_max_object_size, slab_class_id); + } + + slab_class* get_slab_class(const size_t size) { + // given a size, find slab class with binary search. + auto i = std::lower_bound(_slab_class_sizes.begin(), _slab_class_sizes.end(), size); + if (i == _slab_class_sizes.end()) { + return nullptr; + } + auto dist = std::distance(_slab_class_sizes.begin(), i); + return &_slab_classes[dist]; + } + + slab_class* get_slab_class(Item* item) { + auto slab_class_id = item->get_slab_class_id(); + assert(slab_class_id >= 0 && slab_class_id < _slab_classes.size()); + return &_slab_classes[slab_class_id]; + } + + void register_collectd_metrics() { + auto add = [this] (auto type_name, auto name, auto data_type, auto func) { + _registrations.push_back( + scollectd::add_polled_metric(scollectd::type_instance_id("slab", + scollectd::per_cpu_plugin_instance, + type_name, name), + scollectd::make_typed(data_type, func))); + }; + + add("total_operations", "malloc", scollectd::data_type::DERIVE, [&] { return _stats.allocs; }); + add("total_operations", "free", scollectd::data_type::DERIVE, [&] { return _stats.frees; }); + add("objects", "malloc", scollectd::data_type::GAUGE, [&] { return _stats.allocs - _stats.frees; }); + } +public: + slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size) + : _max_object_size(max_object_size) + , _available_slab_pages(limit / max_object_size) + { + initialize_slab_classes(growth_factor, limit); + register_collectd_metrics(); + } + + slab_allocator(double growth_factor, uint64_t limit, uint64_t max_object_size, std::function erase_func) + : _erase_func(std::move(erase_func)) + , _max_object_size(max_object_size) + , _available_slab_pages(limit / max_object_size) + { + initialize_slab_classes(growth_factor, limit); + register_collectd_metrics(); + } + + ~slab_allocator() + { + _registrations.clear(); + } + + /** + * Create an item from a given slab class based on requested size. + */ + template + Item* create(const size_t size, Args&&... args) { + auto slab_class = get_slab_class(size); + if (!slab_class) { + throw std::bad_alloc{}; + } + + Item *item = nullptr; + if (!slab_class->empty()) { + item = slab_class->create(std::forward(args)...); + _stats.allocs++; + } else { + if (_available_slab_pages > 0 || slab_class->has_no_slab_pages()) { + item = slab_class->create_from_new_page(_max_object_size, std::forward(args)...); + if (_available_slab_pages > 0) { + _available_slab_pages--; + } + _stats.allocs++; + } else if (_erase_func) { + item = slab_class->create_from_lru(_erase_func, std::forward(args)...); + } + } + return item; + } + + void lock_item(Item *item) { + // remove item from the lru of its slab class. + auto slab_class = get_slab_class(item); + slab_class->remove_item_from_lru(item); + } + + void unlock_item(Item *item) { + // insert item into the lru of its slab class. + auto slab_class = get_slab_class(item); + slab_class->insert_item_into_lru(item); + } + + /** + * Free an item back to its original slab class. + */ + void free(Item *item) { + if (item) { + auto slab_class = get_slab_class(item); + slab_class->free_item(item); + _stats.frees++; + } + } + + /** + * Update item position in the LRU of its slab class. + */ + void touch(Item *item) { + if (item) { + auto slab_class = get_slab_class(item); + slab_class->touch_item(item); + } + } + + /** + * Helper function: Print all available slab classes and their respective properties. + */ + void print_slab_classes() { + auto class_id = 0; + for (auto& slab_class : _slab_classes) { + size_t size = slab_class.size(); + printf("slab[%3d]\tsize: %10lu\tper-slab-page: %5lu\n", class_id, size, _max_object_size / size); + class_id++; + } + } + + /** + * Helper function: Useful for getting a slab class' chunk size from a size parameter. + */ + size_t class_size(const size_t size) { + auto slab_class = get_slab_class(size); + return (slab_class) ? slab_class->size() : 0; + } +}; + +#endif /* __SLAB_ALLOCATOR__ */ From 66117b2697ef7633b3a493d967079b69834de61b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 16 Feb 2015 15:42:49 -0200 Subject: [PATCH 06/15] memcached: Expiry should be stored as a 32-bit value Expiration time is a 32-bit value as specified by memcached protocol. Thus, no need to store it as a 64-bit value. When the value is needed, convert it to the target type. Change intended to save space from item structure. In addition, there is no need to insert an entry into _alive when expiration time is zero, meaning item will never expire. Signed-off-by: Raphael S. Carvalho --- apps/memcached/memcache.cc | 56 ++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 116aa7ef82..8285938561 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -55,6 +55,30 @@ namespace memcache { template using optional = boost::optional; +struct expiration { + static constexpr uint32_t seconds_in_a_month = 60U * 60 * 24 * 30; + uint32_t _time; + + expiration() : _time(0U) {} + + expiration(uint32_t seconds) { + if (seconds == 0U) { + _time = 0U; // means never expire. + } else if (seconds <= seconds_in_a_month) { + _time = seconds + time(0); // from delta + } else { + _time = seconds; // from real time + } + } + + bool ever_expires() { + return _time; + } + + clock_type::time_point to_time_point() { + return clock_type::time_point(std::chrono::seconds(_time)); + } +}; class item { public: @@ -69,13 +93,13 @@ private: const sstring _ascii_prefix; version_type _version; int _ref_count; + expiration _expiry; hook_type _cache_link; bi::list_member_hook<> _lru_link; bi::list_member_hook<> _timer_link; - time_point _expiry; friend class cache; public: - item(item_key&& key, sstring&& ascii_prefix, sstring&& data, clock_type::time_point expiry, version_type version = 1) + item(item_key&& key, sstring&& ascii_prefix, sstring&& data, expiration expiry, version_type version = 1) : _key(std::move(key)) , _data(std::move(data)) , _ascii_prefix(std::move(ascii_prefix)) @@ -89,7 +113,7 @@ public: item(item&&) = delete; clock_type::time_point get_timeout() { - return _expiry; + return _expiry.to_time_point(); } version_type version() { @@ -236,7 +260,7 @@ struct item_insertion_data { item_key key; sstring ascii_prefix; sstring data; - clock_type::time_point expiry; + expiration expiry; }; class cache { @@ -268,7 +292,9 @@ private: _cache.erase(_cache.iterator_to(item_ref)); } if (IsInTimerList) { - _alive.remove(item_ref); + if (item_ref._expiry.ever_expires()) { + _alive.remove(item_ref); + } } _lru.erase(_lru.iterator_to(item_ref)); _stats._bytes -= item_footprint(item_ref); @@ -304,7 +330,7 @@ private: auto insert_result = _cache.insert(*new_item); assert(insert_result.second); - if (_alive.insert(*new_item)) { + if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) { _timer.rearm(new_item->get_timeout()); } _lru.push_front(*new_item); @@ -320,7 +346,7 @@ private: intrusive_ptr_add_ref(new_item); auto& item_ref = *new_item; _cache.insert(item_ref); - if (_alive.insert(item_ref)) { + if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) { _timer.rearm(item_ref.get_timeout()); } _lru.push_front(*new_item); @@ -734,7 +760,6 @@ private: item_insertion_data _insertion; std::vector _items; private: - static constexpr uint32_t seconds_in_a_month = 60 * 60 * 24 * 30; static constexpr const char *msg_crlf = "\r\n"; static constexpr const char *msg_error = "ERROR\r\n"; static constexpr const char *msg_stored = "STORED\r\n"; @@ -894,22 +919,12 @@ public: , _system_stats(system_stats) {} - clock_type::time_point seconds_to_time_point(uint32_t seconds) { - if (seconds == 0) { - return clock_type::time_point::max(); - } else if (seconds <= seconds_in_a_month) { - return clock_type::now() + std::chrono::seconds(seconds); - } else { - return clock_type::time_point(std::chrono::seconds(seconds)); - } - } - void prepare_insertion() { _insertion = item_insertion_data{ .key = std::move(_parser._key), .ascii_prefix = make_sstring(" ", _parser._flags_str, " ", _parser._size_str), .data = std::move(_parser._blob), - .expiry = seconds_to_time_point(_parser._expiration) + .expiry = expiration(_parser._expiration) }; } @@ -1005,7 +1020,8 @@ public: { _system_stats.local()._cmd_flush++; if (_parser._expiration) { - auto f = _cache.flush_at(seconds_to_time_point(_parser._expiration)); + auto expiry = expiration(_parser._expiration); + auto f = _cache.flush_at(expiry.to_time_point()); if (_parser._noreply) { return f; } From 6dea362304a597084836c9fb8daa080ce094bb0f Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 19 Feb 2015 20:51:24 -0200 Subject: [PATCH 07/15] Add experimental::string_view option to scattered_message::append_static Signed-off-by: Raphael S. Carvalho --- core/scattered_message.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/scattered_message.hh b/core/scattered_message.hh index 1b00e661d5..69292e4329 100644 --- a/core/scattered_message.hh +++ b/core/scattered_message.hh @@ -29,6 +29,7 @@ #include "sstring.hh" #include #include +#include template class scattered_message { @@ -62,6 +63,10 @@ public: append_static(s.begin(), s.size()); } + void append_static(const std::experimental::string_view& s) { + append_static(s.data(), s.size()); + } + template void append(basic_sstring s) { if (s.size()) { From aa6d850228601977f6376ef0434f0606ddbb87df Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Fri, 20 Feb 2015 12:54:43 -0200 Subject: [PATCH 08/15] memcached: Integrate slab allocator * Item structure was changed to work with slab, where its last field is used to access key, value, and ascii prefix, respectively. * Item structured was worked to have a smaller footprint. It was reduced from 104 bytes to 72 bytes. * Reclaimer was temporarily disabled. * Global LRU was removed from the cache. Now LRU eviction is done on a per-slab-class basis, whenever the slab allocator runs out of slab pages. * Fragmentation issue is naturally solved with the slab algorithm, where slab classes have chunks of the same size. --- apps/memcached/memcache.cc | 273 ++++++++++++++++++++++-------------- apps/memcached/memcached.hh | 1 + 2 files changed, 172 insertions(+), 102 deletions(-) diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 8285938561..881f092cc3 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -16,7 +16,7 @@ * under the License. */ /* - * Copyright 2014 Cloudius Systems + * Copyright 2014-2015 Cloudius Systems */ #include @@ -35,10 +35,12 @@ #include "core/units.hh" #include "core/distributed.hh" #include "core/vector-data-sink.hh" +#include "core/bitops.hh" +#include "core/slab.hh" +#include "core/align.hh" #include "net/api.hh" #include "net/packet-data-source.hh" #include "apps/memcached/ascii.hh" -#include "core/bitops.hh" #include "memcached.hh" #include @@ -52,6 +54,11 @@ namespace bi = boost::intrusive; namespace memcache { +static constexpr double default_slab_growth_factor = 1.25; +static constexpr uint64_t default_slab_page_size = 1UL*MB; +static constexpr uint64_t default_per_cpu_slab_size = 64UL*MB; +static __thread slab_allocator* slab; + template using optional = boost::optional; @@ -80,33 +87,49 @@ struct expiration { } }; -class item { +class item : public slab_item_base { public: using version_type = uint64_t; using time_point = clock_type::time_point; using duration = clock_type::duration; + static constexpr uint8_t field_alignment = alignof(void*); private: using hook_type = bi::unordered_set_member_hook<>; // TODO: align shared data to cache line boundary - item_key _key; - const sstring _data; - const sstring _ascii_prefix; version_type _version; - int _ref_count; - expiration _expiry; hook_type _cache_link; - bi::list_member_hook<> _lru_link; bi::list_member_hook<> _timer_link; + size_t _key_hash; + expiration _expiry; + uint32_t _value_size; + uint16_t _ref_count; + uint8_t _key_size; + uint8_t _ascii_prefix_size; + uint8_t _slab_class_id; + char _unused[3]; + char _data[]; // layout: data=key, (data+key_size)=ascii_prefix, (data+key_size+ascii_prefix_size)=value. friend class cache; public: - item(item_key&& key, sstring&& ascii_prefix, sstring&& data, expiration expiry, version_type version = 1) - : _key(std::move(key)) - , _data(std::move(data)) - , _ascii_prefix(std::move(ascii_prefix)) - , _version(version) - , _ref_count(0) + item(uint8_t slab_class_id, item_key&& key, sstring&& ascii_prefix, + sstring&& value, expiration expiry, version_type version = 1) + : _version(version) + , _key_hash(key.hash()) , _expiry(expiry) + , _value_size(value.size()) + , _ref_count(0) + , _key_size(key.key().size()) + , _ascii_prefix_size(ascii_prefix.size()) + , _slab_class_id(slab_class_id) { + assert(_key_size <= std::numeric_limits::max()); + assert(_ascii_prefix_size <= std::numeric_limits::max()); + // storing key + memcpy(_data, key.key().c_str(), _key_size); + // storing ascii_prefix + memcpy(_data + align_up(_key_size, field_alignment), ascii_prefix.c_str(), _ascii_prefix_size); + // storing value + memcpy(_data + align_up(_key_size, field_alignment) + align_up(_ascii_prefix_size, field_alignment), + value.c_str(), _value_size); } item(const item&) = delete; @@ -120,25 +143,40 @@ public: return _version; } - const sstring& data() { - return _data; + const std::experimental::string_view key() const { + return std::experimental::string_view(_data, _key_size); } - const sstring& ascii_prefix() { - return _ascii_prefix; + const std::experimental::string_view ascii_prefix() const { + const char *p = _data + align_up(_key_size, field_alignment); + return std::experimental::string_view(p, _ascii_prefix_size); } - const sstring& key() { - return _key.key(); + const std::experimental::string_view value() const { + const char *p = _data + align_up(_key_size, field_alignment) + + align_up(_ascii_prefix_size, field_alignment); + return std::experimental::string_view(p, _value_size); + } + + size_t key_size() const { + return _key_size; + } + + size_t ascii_prefix_size() const { + return _ascii_prefix_size; + } + + size_t value_size() const { + return _value_size; } optional data_as_integral() { - auto str = _data.c_str(); + auto str = value().data(); if (str[0] == '-') { return {}; } - auto len = _data.size(); + auto len = _value_size; // Strip trailing space while (len && str[len - 1] == ' ') { @@ -154,26 +192,43 @@ public: // needed by timer_set bool cancel() { - assert(false); return false; } + // get_slab_class_id() and is_unlocked() are methods required by slab class. + const uint8_t get_slab_class_id() { + return _slab_class_id; + } + bool is_unlocked() { + return _ref_count == 1; + } + friend bool operator==(const item &a, const item &b) { - return a._key == b._key; + return (a._key_hash == b._key_hash) && + (a._key_size == b._key_size) && + (memcmp(a._data, b._data, a._key_size) == 0); } friend std::size_t hash_value(const item &i) { - return std::hash()(i._key); + return i._key_hash; } friend inline void intrusive_ptr_add_ref(item* it) { + assert(it->_ref_count >= 0); ++it->_ref_count; + if (it->_ref_count == 2) { + slab->lock_item(it); + } } friend inline void intrusive_ptr_release(item* it) { - if (--it->_ref_count == 0) { - delete it; + --it->_ref_count; + if (it->_ref_count == 1) { + slab->unlock_item(it); + } else if (it->_ref_count == 0) { + slab->free(it); } + assert(it->_ref_count >= 0); } friend class item_key_cmp; @@ -181,12 +236,19 @@ public: struct item_key_cmp { +private: + bool compare(const item_key& key, const item& it) const { + return (it._key_hash == key.hash()) && + (it._key_size == key.key().size()) && + (memcmp(it._data, key.key().c_str(), it._key_size) == 0); + } +public: bool operator()(const item_key& key, const item& it) const { - return key == it._key; + return compare(key, it); } bool operator()(const item& it, const item_key& key) const { - return key == it._key; + return compare(key, it); } }; @@ -275,18 +337,40 @@ private: size_t _resize_up_threshold = load_factor * initial_bucket_count; cache_type::bucket_type* _buckets; cache_type _cache; - bi::list, &item::_lru_link>> _lru; timer_set _alive; timer<> _timer; cache_stats _stats; timer<> _flush_timer; - memory::reclaimer _reclaimer; private: - size_t item_footprint(item& item_ref) { - return sizeof(item) + item_ref._data.size() + item_ref._ascii_prefix.size() + item_ref.key().size(); + size_t item_size(item& item_ref) { + constexpr size_t field_alignment = alignof(void*); + return sizeof(item) + + align_up(item_ref.key_size(), field_alignment) + + align_up(item_ref.ascii_prefix_size(), field_alignment) + + item_ref.value_size(); } - template + size_t item_size(item_insertion_data& insertion) { + constexpr size_t field_alignment = alignof(void*); + auto size = sizeof(item) + + align_up(insertion.key.key().size(), field_alignment) + + align_up(insertion.ascii_prefix.size(), field_alignment) + + insertion.data.size(); +#ifdef __DEBUG__ + static bool print_item_footprint = true; + if (print_item_footprint) { + print_item_footprint = false; + std::cout << __FUNCTION__ << ": " << size << "\n"; + std::cout << "sizeof(item) " << sizeof(item) << "\n"; + std::cout << "key.size " << insertion.key.key().size() << "\n"; + std::cout << "value.size " << insertion.data.size() << "\n"; + std::cout << "ascii_prefix.size " << insertion.ascii_prefix.size() << "\n"; + } +#endif + return size; + } + + template void erase(item& item_ref) { if (IsInCache) { _cache.erase(_cache.iterator_to(item_ref)); @@ -296,9 +380,11 @@ private: _alive.remove(item_ref); } } - _lru.erase(_lru.iterator_to(item_ref)); - _stats._bytes -= item_footprint(item_ref); - intrusive_ptr_release(&item_ref); + _stats._bytes -= item_size(item_ref); + if (Release) { + // memory used by item shouldn't be freed when slab is replacing it with another item. + intrusive_ptr_release(&item_ref); + } } void expire() { @@ -321,27 +407,29 @@ private: inline cache_iterator add_overriding(cache_iterator i, item_insertion_data& insertion) { auto& old_item = *i; - - auto new_item = new item(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), - Origin::move_if_local(insertion.data), insertion.expiry, old_item._version + 1); - intrusive_ptr_add_ref(new_item); + uint64_t old_item_version = old_item._version; erase(old_item); + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + Origin::move_if_local(insertion.data), insertion.expiry, old_item_version + 1); + intrusive_ptr_add_ref(new_item); + auto insert_result = _cache.insert(*new_item); assert(insert_result.second); if (insertion.expiry.ever_expires() && _alive.insert(*new_item)) { _timer.rearm(new_item->get_timeout()); } - _lru.push_front(*new_item); - _stats._bytes += item_footprint(*new_item); + _stats._bytes += size; return insert_result.first; } template inline void add_new(item_insertion_data& insertion) { - auto new_item = new item(Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), + size_t size = item_size(insertion); + auto new_item = slab->create(size, Origin::move_if_local(insertion.key), Origin::move_if_local(insertion.ascii_prefix), Origin::move_if_local(insertion.data), insertion.expiry); intrusive_ptr_add_ref(new_item); auto& item_ref = *new_item; @@ -349,8 +437,7 @@ private: if (insertion.expiry.ever_expires() && _alive.insert(item_ref)) { _timer.rearm(item_ref.get_timeout()); } - _lru.push_front(*new_item); - _stats._bytes += item_footprint(item_ref); + _stats._bytes += size; maybe_rehash(); } @@ -362,7 +449,6 @@ private: _buckets = new cache_type::bucket_type[new_size]; } catch (const std::bad_alloc& e) { _stats._resize_failure++; - evict(100); // In order to amortize the cost of resize failure return; } _cache.rehash(typename cache_type::bucket_traits(_buckets, new_size)); @@ -370,57 +456,24 @@ private: _resize_up_threshold = _cache.bucket_count() * load_factor; } } - - // Evicts at most @count items. - void evict(size_t count) { - while (!_lru.empty() && count--) { - erase(_lru.back()); - _stats._evicted++; - } - } - - void reclaim(size_t target) { - size_t reclaimed_so_far = 0; - _stats._reclaims++; - - auto i = _lru.end(); - if (i == _lru.begin()) { - return; - } - - --i; - - bool done = false; - do { - item& victim = *i; - if (i != _lru.begin()) { - --i; - } else { - done = true; - } - - // If the item is shared, we can not assume that removing it from - // cache would cause the memory to be reclaimed in a timely manner - // so we reclaim only items which are not shared. - if (victim._ref_count == 1) { - reclaimed_so_far += item_footprint(victim); - erase(victim); - _stats._evicted++; - - if (reclaimed_so_far >= target) { - done = true; - } - } - } while (!done); - } public: - cache() + cache(uint64_t per_cpu_slab_size, uint64_t slab_page_size) : _buckets(new cache_type::bucket_type[initial_bucket_count]) , _cache(cache_type::bucket_traits(_buckets, initial_bucket_count)) - , _reclaimer([this] { reclaim(5*MB); }) { _timer.set_callback([this] { expire(); }); _flush_timer.set_callback([this] { flush_all(); }); + + // initialize per-thread slab allocator. + slab = new slab_allocator(default_slab_growth_factor, per_cpu_slab_size, slab_page_size, + [this](item& item_ref) { erase(item_ref); _stats._evicted++; }); +#ifdef __DEBUG__ + static bool print_slab_classes = true; + if (print_slab_classes) { + print_slab_classes = false; + slab->print_slab_classes(); + } +#endif } ~cache() { @@ -496,8 +549,6 @@ public: } _stats._get_hits++; auto& item_ref = *i; - _lru.erase(_lru.iterator_to(item_ref)); - _lru.push_front(item_ref); return item_ptr(&item_ref); } @@ -546,7 +597,7 @@ public: } item_insertion_data insertion { .key = Origin::move_if_local(key), - .ascii_prefix = item_ref._ascii_prefix, + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), .data = to_sstring(*value + delta), .expiry = item_ref._expiry }; @@ -569,7 +620,7 @@ public: } item_insertion_data insertion { .key = Origin::move_if_local(key), - .ascii_prefix = item_ref._ascii_prefix, + .ascii_prefix = sstring(item_ref.ascii_prefix().data(), item_ref.ascii_prefix_size()), .data = to_sstring(*value - std::min(*value, delta)), .expiry = item_ref._expiry }; @@ -772,6 +823,7 @@ private: static constexpr const char *msg_version = "VERSION " VERSION_STRING "\r\n"; static constexpr const char *msg_exists = "EXISTS\r\n"; static constexpr const char *msg_stat = "STAT "; + static constexpr const char *msg_out_of_memory = "SERVER_ERROR Out of memory allocating new item\r\n"; static constexpr const char *msg_error_non_numeric_value = "CLIENT_ERROR cannot increment or decrement non-numeric value\r\n"; private: template @@ -790,7 +842,7 @@ private: } msg.append_static(msg_crlf); - msg.append_static(item->data()); + msg.append_static(item->value()); msg.append_static(msg_crlf); msg.on_delete([item = std::move(item)] {}); } @@ -1063,7 +1115,7 @@ public: if (!incremented) { return out.write(msg_error_non_numeric_value); } - return out.write(item->data()).then([&out] { + return out.write(item->value().data(), item->value_size()).then([&out] { return out.write(msg_crlf); }); }); @@ -1084,13 +1136,25 @@ public: if (!decremented) { return out.write(msg_error_non_numeric_value); } - return out.write(item->data()).then([&out] { + return out.write(item->value().data(), item->value_size()).then([&out] { return out.write(msg_crlf); }); }); } }; std::abort(); + }).rescue([this, &out] (auto get_ex) -> future<> { + // FIXME: rescue being scheduled even though no exception was triggered has a + // performance cost of about 2.6%. Not using it means maintainability penalty. + try { + get_ex(); + } catch (std::bad_alloc& e) { + if (_parser._noreply) { + return make_ready_future<>(); + } + return out.write(msg_out_of_memory); + } + return make_ready_future<>(); }); }; }; @@ -1293,6 +1357,10 @@ int main(int ac, char** av) { app.add_options() ("max-datagram-size", bpo::value()->default_value(memcache::udp_server::default_max_datagram_size), "Maximum size of UDP datagram") + ("max-slab-size", bpo::value()->default_value(memcache::default_per_cpu_slab_size/MB), + "Maximum memory to be used for items (value in megabytes)") + ("slab-page-size", bpo::value()->default_value(memcache::default_slab_page_size/MB), + "Size of slab page (value in megabytes)") ("stats", "Print basic statistics periodically (every second)") ("port", bpo::value()->default_value(11211), @@ -1307,7 +1375,9 @@ int main(int ac, char** av) { auto&& config = app.configuration(); uint16_t port = config["port"].as(); - return cache_peers.start().then([&system_stats] { + uint64_t per_cpu_slab_size = config["max-slab-size"].as() * MB; + uint64_t slab_page_size = config["slab-page-size"].as() * MB; + return cache_peers.start(std::move(per_cpu_slab_size), std::move(slab_page_size)).then([&system_stats] { return system_stats.start(clock_type::now()); }).then([&] { std::cout << PLATFORM << " memcached " << VERSION << "\n"; @@ -1334,4 +1404,3 @@ int main(int ac, char** av) { }); }); } - diff --git a/apps/memcached/memcached.hh b/apps/memcached/memcached.hh index c74198627a..54fa217fcc 100644 --- a/apps/memcached/memcached.hh +++ b/apps/memcached/memcached.hh @@ -22,6 +22,7 @@ namespace memcache { +class item; class cache; class item_key { From 67560bec4888521766953b0867aedf522f46ec8b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 25 Feb 2015 19:32:57 -0300 Subject: [PATCH 09/15] test: Add slab allocator testcase Signed-off-by: Raphael S. Carvalho --- configure.py | 2 + tests/slab_test.cc | 123 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 tests/slab_test.cc diff --git a/configure.py b/configure.py index eb86aa27ee..e20548ecb4 100755 --- a/configure.py +++ b/configure.py @@ -102,6 +102,7 @@ tests = [ 'tests/output_stream_test', 'tests/udp_zero_copy', 'tests/shared_ptr_test', + 'tests/slab_test' ] apps = [ @@ -224,6 +225,7 @@ deps = { 'tests/output_stream_test': ['tests/output_stream_test.cc'] + core + libnet, 'tests/udp_zero_copy': ['tests/udp_zero_copy.cc'] + core + libnet, 'tests/shared_ptr_test': ['tests/shared_ptr_test.cc'] + core, + 'tests/slab_test': ['tests/slab_test.cc'] + core, } warnings = [ diff --git a/tests/slab_test.cc b/tests/slab_test.cc new file mode 100644 index 0000000000..6b3aa6003d --- /dev/null +++ b/tests/slab_test.cc @@ -0,0 +1,123 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + * + * To compile: g++ -std=c++14 slab_test.cc + */ + +#include +#include +#include "core/slab.hh" + +static constexpr size_t max_object_size = 1024*1024; + +class item : public slab_item_base { +public: + bi::list_member_hook<> _cache_link; + uint8_t _slab_class_id; + + item(uint8_t slab_class_id) : _slab_class_id(slab_class_id) {} + + const uint8_t get_slab_class_id() { + return _slab_class_id; + } + const bool is_unlocked() { + return true; + } +}; + +template +static void free_vector(slab_allocator& slab, std::vector& items) { + for (auto item : items) { + slab.free(item); + } +} + +static void test_allocation_1(const double growth_factor, const unsigned slab_limit_size) { + slab_allocator slab(growth_factor, slab_limit_size, max_object_size); + size_t size = max_object_size; + + slab.print_slab_classes(); + + std::vector items; + + assert(slab_limit_size % size == 0); + for (auto i = 0u; i < (slab_limit_size / size); i++) { + auto item = slab.create(size); + items.push_back(item); + } + assert(slab.create(size) == nullptr); + + free_vector(slab, items); + std::cout << __FUNCTION__ << " done!\n"; +} + +static void test_allocation_2(const double growth_factor, const unsigned slab_limit_size) { + slab_allocator slab(growth_factor, slab_limit_size, max_object_size); + size_t size = 1024; + + std::vector items; + + auto allocations = 0u; + for (;;) { + auto item = slab.create(size); + if (!item) { + break; + } + items.push_back(item); + allocations++; + } + + auto class_size = slab.class_size(size); + auto per_slab_page = max_object_size / class_size; + auto available_slab_pages = slab_limit_size / max_object_size; + assert(allocations == (per_slab_page * available_slab_pages)); + + free_vector(slab, items); + std::cout << __FUNCTION__ << " done!\n"; +} + +static void test_allocation_with_lru(const double growth_factor, const unsigned slab_limit_size) { + bi::list, &item::_cache_link>> _cache; + unsigned evictions = 0; + + slab_allocator slab(growth_factor, slab_limit_size, max_object_size, + [&](item& item_ref) { _cache.erase(_cache.iterator_to(item_ref)); evictions++; }); + size_t size = max_object_size; + + auto max = slab_limit_size / max_object_size; + for (auto i = 0u; i < max * 1000; i++) { + auto item = slab.create(size); + assert(item != nullptr); + _cache.push_front(*item); + } + assert(evictions == max * 999); + + _cache.clear(); + + std::cout << __FUNCTION__ << " done!\n"; +} + +int main(int ac, char** av) { + test_allocation_1(1.25, 5*1024*1024); + test_allocation_2(1.07, 5*1024*1024); // 1.07 is the growth factor used by facebook. + test_allocation_with_lru(1.25, 5*1024*1024); + + return 0; +} From 1492ae9040f451d098ab63a3cb70fc0e0ac8a6f6 Mon Sep 17 00:00:00 2001 From: Dor Laor Date: Thu, 26 Feb 2015 12:27:49 +0200 Subject: [PATCH 10/15] README: Add explanation about 100% cpu consumption and recommended configuration --- README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/README.md b/README.md index dcf61804cb..4d01a2061a 100644 --- a/README.md +++ b/README.md @@ -312,3 +312,26 @@ void f() { } } ``` + +### Setup notes + +SeaStar is a high perofrmance framework and tuned to get the best +performance by default. As such, we're tuned towards polling vs interrupt +driven. Our assumption is that applications written for SeaStar will be +busy handling 100,000 IOPS and beyond. Polling means that each of our +cores will consume 100% cpu even when no work is given to it. + + +Recommended hardware configuration for SeaStar +---------------------------------------------- + +CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA +NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use + 1G to but you may be limited by their capacity. + In addition, the more hardware queue per cpu the better for SeaStar. + Otherwise we have to emulate that in software. +Disks - Fast SSDs with high number of IOPS. +Client machines - Usually a single client machine can't load our servers. + Both memaslap (memcached) and WRK (httpd) cannot over load their matching + server counter parts. We recommend running the client on different machine + than the servers and use several of them. From 381717a66e60265a70f52588bf8551ff96c9ce6f Mon Sep 17 00:00:00 2001 From: Dor Laor Date: Thu, 26 Feb 2015 12:47:30 +0200 Subject: [PATCH 11/15] Run ispell on README.md --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4d01a2061a..9ceb085a4c 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ sudo apt-get install libaio-dev ninja-build ragel libhwloc-dev libnuma-dev libpc Installing GCC 4.9 for gnu++1y. Unlike the Fedora case above, this will not harm the existing installation of GCC 4.8, and will install an additional set of compilers, and additional commands named gcc-4.9, -g++-4.9, etc., that need to be used explictly, while the "gcc", "g++", +g++-4.9, etc., that need to be used explicitly, while the "gcc", "g++", etc., commands continue to point to the 4.8 versions. ``` @@ -183,7 +183,7 @@ void f() { } ``` -Here, we initate a *get()* operation, requesting that when it completes, a +Here, we initiate a *get()* operation, requesting that when it completes, a *put()* operation will be scheduled with an incremented value. We also request that when the *put()* completes, some text will be printed out. @@ -255,7 +255,7 @@ code to it. After the I/O operation initiated by `put()` completes, it calls the continuation associated with `f12`, which simply tells it to call the -continuation assoicated with `f2`. This continuation simply calls +continuation associated with `f2`. This continuation simply calls `loop_to()`. Both `f12` and `f2` are freed. `loop_to()` then calls `get()`, which starts the process all over again, allocating new versions of `f1` and `f2`. @@ -290,7 +290,7 @@ void f() { } ``` -When the `get_ex` variable is called as a function, it will rethrow +When the `get_ex` variable is called as a function, it will re-throw the exception that aborted processing, and you can then apply any needed error handling. It is essentially a transformation of From 6f36fd48addbc954e5389121f3fe909e4b72efae Mon Sep 17 00:00:00 2001 From: Dor Laor Date: Thu, 26 Feb 2015 12:53:40 +0200 Subject: [PATCH 12/15] Tiny nitpicks in readme file --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 9ceb085a4c..0dcb65d038 100644 --- a/README.md +++ b/README.md @@ -315,7 +315,7 @@ void f() { ### Setup notes -SeaStar is a high perofrmance framework and tuned to get the best +SeaStar is a high performance framework and tuned to get the best performance by default. As such, we're tuned towards polling vs interrupt driven. Our assumption is that applications written for SeaStar will be busy handling 100,000 IOPS and beyond. Polling means that each of our @@ -325,13 +325,13 @@ cores will consume 100% cpu even when no work is given to it. Recommended hardware configuration for SeaStar ---------------------------------------------- -CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA -NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use +* CPUs - As much as you need. SeaStar is highly friendly for multi-core and NUMA +* NICs - As fast as possible, we recommend 10G or 40G cards. It's possible to use 1G to but you may be limited by their capacity. In addition, the more hardware queue per cpu the better for SeaStar. Otherwise we have to emulate that in software. -Disks - Fast SSDs with high number of IOPS. -Client machines - Usually a single client machine can't load our servers. +* Disks - Fast SSDs with high number of IOPS. +* Client machines - Usually a single client machine can't load our servers. Both memaslap (memcached) and WRK (httpd) cannot over load their matching server counter parts. We recommend running the client on different machine than the servers and use several of them. From 9861bfca015ec8503f68e12ceb53690135053379 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 26 Feb 2015 10:19:54 -0500 Subject: [PATCH 13/15] open_file_dma: allow the specification of open flags It is sometimes frustrating to use open_file_dma, because it has the hardcoded behavior of always assuming O_CREAT. Sometimes this is not desirable, and it would be nice to have the option not to do so. Note that, by design, I am only including in the open_flags enum things that we want the user of the API to control. Stuff like O_DIRECT should not be optional, and therefore is not included in the visible interface. Because of that I am changing the function signature to include a paramater that specifies whether or not we should create the file if it does not exist. Signed-off-by: Glauber Costa --- core/reactor.cc | 6 +++--- core/reactor.hh | 14 +++++++++++++- tests/blkdiscard_test.cc | 2 +- tests/fileiotest.cc | 2 +- tests/linecount.cc | 2 +- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 194db7beac..8ce4dff5fc 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -328,9 +328,9 @@ posix_file_impl::read_dma(uint64_t pos, std::vector iov) { } future -reactor::open_file_dma(sstring name) { - return _thread_pool.submit>([name] { - return wrap_syscall(::open(name.c_str(), O_DIRECT | O_CLOEXEC | O_CREAT | O_RDWR, S_IRWXU)); +reactor::open_file_dma(sstring name, open_flags flags) { + return _thread_pool.submit>([name, flags] { + return wrap_syscall(::open(name.c_str(), O_DIRECT | O_CLOEXEC | static_cast(flags), S_IRWXU)); }).then([] (syscall_result sr) { sr.throw_if_error(); return make_ready_future(file(sr.result)); diff --git a/core/reactor.hh b/core/reactor.hh index 1a3c792734..4eba12bf4d 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -61,6 +61,7 @@ #include "file.hh" #include "semaphore.hh" #include "core/scattered_message.hh" +#include "core/enum.hh" #ifdef HAVE_OSV #include @@ -598,6 +599,17 @@ public: }; #endif /* HAVE_OSV */ +enum class open_flags { + rw = O_RDWR, + ro = O_RDONLY, + wo = O_WRONLY, + create = O_CREAT, +}; + +inline open_flags operator|(open_flags a, open_flags b) { + return open_flags(static_cast(a) | static_cast(b)); +} + class reactor { private: struct pollfn { @@ -726,7 +738,7 @@ public: future<> write_all(pollable_fd_state& fd, const void* buffer, size_t size); - future open_file_dma(sstring name); + future open_file_dma(sstring name, open_flags flags); future open_directory(sstring name); template diff --git a/tests/blkdiscard_test.cc b/tests/blkdiscard_test.cc index b40e65b774..cd0a23a7ac 100644 --- a/tests/blkdiscard_test.cc +++ b/tests/blkdiscard_test.cc @@ -42,7 +42,7 @@ int main(int ac, char** av) { auto&& config = app.configuration(); auto filepath = config["dev"].as(); - engine().open_file_dma(filepath).then([] (file f) { + engine().open_file_dma(filepath, open_flags::rw | open_flags::create).then([] (file f) { auto ft = new file_test{std::move(f)}; ft->f.stat().then([ft] (struct stat st) mutable { diff --git a/tests/fileiotest.cc b/tests/fileiotest.cc index a58910ebcf..6f42e1d0c5 100644 --- a/tests/fileiotest.cc +++ b/tests/fileiotest.cc @@ -31,7 +31,7 @@ struct file_test { int main(int ac, char** av) { static constexpr auto max = 10000; - engine().open_file_dma("testfile.tmp").then([] (file f) { + engine().open_file_dma("testfile.tmp", open_flags::rw | open_flags::create).then([] (file f) { auto ft = new file_test{std::move(f)}; for (size_t i = 0; i < max; ++i) { ft->par.wait().then([ft, i] { diff --git a/tests/linecount.cc b/tests/linecount.cc index e035dbc77c..d96441be3b 100644 --- a/tests/linecount.cc +++ b/tests/linecount.cc @@ -52,7 +52,7 @@ int main(int ac, char** av) { }); app.run(ac, av, [&app] { auto fname = app.configuration()["file"].as(); - engine().open_file_dma(fname).then([] (file f) { + engine().open_file_dma(fname, open_flags::rw | open_flags::create).then([] (file f) { auto r = make_shared(std::move(f)); r->is.consume(*r).then([r] { print("%d lines\n", r->count); From 6b77cf7d080f9a429768f36c27c831dabdbbe860 Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Thu, 26 Feb 2015 10:19:55 -0500 Subject: [PATCH 14/15] linecount: change rw to ro Avi says it is enough. Signed-off-by: Glauber Costa --- tests/linecount.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/linecount.cc b/tests/linecount.cc index d96441be3b..5f15ed8548 100644 --- a/tests/linecount.cc +++ b/tests/linecount.cc @@ -52,7 +52,7 @@ int main(int ac, char** av) { }); app.run(ac, av, [&app] { auto fname = app.configuration()["file"].as(); - engine().open_file_dma(fname, open_flags::rw | open_flags::create).then([] (file f) { + engine().open_file_dma(fname, open_flags::ro | open_flags::create).then([] (file f) { auto r = make_shared(std::move(f)); r->is.consume(*r).then([r] { print("%d lines\n", r->count); From 1fadd7d608c89fbe9ab46332db23395a7a5bf0a7 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 26 Feb 2015 16:31:56 +0100 Subject: [PATCH 15/15] net: Move ipv4_addr constructor to the source file boost::join() provided by boost/algorithm/string.hpp conflicts with boost::join() from boost/range/join.hpp. It looks like a boost issue but let's not pollute the namespace unnecesssarily. Regarding the change in configure.py, it looks like scollectd.cc is part of the 'core' package, but it needs 'net/api.hh', so I added 'net/net.cc' to core. --- configure.py | 4 ++-- core/posix.hh | 1 + net/api.hh | 17 +---------------- net/net.cc | 16 ++++++++++++++++ 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/configure.py b/configure.py index e20548ecb4..8d42481e5e 100755 --- a/configure.py +++ b/configure.py @@ -142,7 +142,6 @@ libnet = [ 'net/proxy.cc', 'net/virtio.cc', 'net/dpdk.cc', - 'net/net.cc', 'net/ip.cc', 'net/ethernet.cc', 'net/arp.cc', @@ -165,7 +164,8 @@ core = [ 'util/conversions.cc', 'net/packet.cc', 'net/posix-stack.cc', - 'tests/test_runner.cc' + 'tests/test_runner.cc', + 'net/net.cc', ] defines = [] diff --git a/core/posix.hh b/core/posix.hh index 476ec9a24e..7044ca7dde 100644 --- a/core/posix.hh +++ b/core/posix.hh @@ -24,6 +24,7 @@ #include "sstring.hh" #include +#include #include #include #include diff --git a/net/api.hh b/net/api.hh index f68652d2b1..d1f75f8035 100644 --- a/net/api.hh +++ b/net/api.hh @@ -33,8 +33,6 @@ #include #include #include -#include -#include class socket_address { public: @@ -58,20 +56,7 @@ struct ipv4_addr { ipv4_addr() : ip(0), port(0) {} ipv4_addr(uint32_t ip, uint16_t port) : ip(ip), port(port) {} ipv4_addr(uint16_t port) : ip(0), port(port) {} - - ipv4_addr(const std::string &addr) { - std::vector items; - boost::split(items, addr, boost::is_any_of(":")); - if (items.size() == 1) { - ip = boost::asio::ip::address_v4::from_string(addr).to_ulong(); - port = 0; - } else if (items.size() == 2) { - ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong(); - port = std::stoul(items[1]); - } else { - throw std::invalid_argument("invalid format: " + addr); - } - } + ipv4_addr(const std::string &addr); ipv4_addr(const socket_address &sa) { ip = net::ntoh(sa.u.in.sin_addr.s_addr); diff --git a/net/net.cc b/net/net.cc index c9039037e2..4a732446fc 100644 --- a/net/net.cc +++ b/net/net.cc @@ -20,12 +20,28 @@ * */ +#include +#include #include "net.hh" #include #include "toeplitz.hh" using std::move; +ipv4_addr::ipv4_addr(const std::string &addr) { + std::vector items; + boost::split(items, addr, boost::is_any_of(":")); + if (items.size() == 1) { + ip = boost::asio::ip::address_v4::from_string(addr).to_ulong(); + port = 0; + } else if (items.size() == 2) { + ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong(); + port = std::stoul(items[1]); + } else { + throw std::invalid_argument("invalid format: " + addr); + } +} + namespace net { inline