From e57be410b5f3f06c2c9a723ad32c8f3f34f420b1 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 15 Jan 2015 12:07:09 +0200 Subject: [PATCH 1/5] memory: support cross-cpu freeing Currently we require that memory be freed on the same cpu it was allocated. This does not impose difficulties on the user code, since our code is already smp-unsafe, and so must use message-passing to run the destructor on the origin cpu, so memory is naturally freed there as well. However, library code does not run under our assumptions, specifically std::exception_ptr, which we do transport across cores. To support this use case, add low-performance support for cross-cpu frees, using an atomic singly linked list per core. --- core/memory.cc | 67 ++++++++++++++++++++++++++++++++++++++++++++++---- core/memory.hh | 12 +++++++-- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/core/memory.cc b/core/memory.cc index 2ca58dbc23..0b7d55428a 100644 --- a/core/memory.cc +++ b/core/memory.cc @@ -60,6 +60,8 @@ static constexpr const size_t page_bits = 12; static constexpr const size_t page_size = 1 << page_bits; static constexpr const size_t huge_page_size = 512 * page_size; static constexpr const unsigned cpu_id_shift = 36; // FIXME: make dynamic +static constexpr const unsigned max_cpus = 256; +static constexpr const size_t cache_line_size = 64; using pageidx = uint32_t; @@ -68,6 +70,7 @@ class page_list; static thread_local uint64_t g_allocs; static thread_local uint64_t g_frees; +static thread_local uint64_t g_cross_cpu_frees; using std::experimental::optional; @@ -76,6 +79,11 @@ using allocate_system_memory_fn namespace bi = boost::intrusive; +inline +unsigned object_cpu_id(void* ptr) { + return (reinterpret_cast(ptr) >> cpu_id_shift) & 0xff; +} + class page_list_link { uint32_t _prev; uint32_t _next; @@ -226,6 +234,10 @@ public: static constexpr const size_t max_small_allocation = small_pool::idx_to_size(small_pool_array::nr_small_pools - 1); +struct cross_cpu_free_item { + cross_cpu_free_item* next; +}; + struct cpu_pages { static constexpr unsigned min_free_pages = 20000000 / page_size; char* memory; @@ -249,7 +261,9 @@ struct cpu_pages { page_list free_spans[nr_span_lists]; // contains spans with span_size >= 2^idx } fsu; small_pool_array small_pools; + alignas(cache_line_size) std::atomic xcpu_freelist; static std::atomic cpu_id_gen; + static cpu_pages* all_cpus[max_cpus]; char* mem() { return memory; } void link(page_list& list, page* span); @@ -268,6 +282,8 @@ struct cpu_pages { void* allocate_small(unsigned size); void free(void* ptr); void free(void* ptr, size_t size); + void free_cross_cpu(unsigned cpu_id, void* ptr); + bool drain_cross_cpu_freelist(); size_t object_size(void* ptr); page* to_page(void* p) { return &pages[(reinterpret_cast(p) - mem()) / page_size]; @@ -283,6 +299,7 @@ struct cpu_pages { static thread_local cpu_pages cpu_mem; std::atomic cpu_pages::cpu_id_gen; +cpu_pages* cpu_pages::all_cpus[max_cpus]; // Free spans are store in the largest index i such that nr_pages >= 1 << i. static inline @@ -379,6 +396,7 @@ cpu_pages::allocate_large_and_trim(unsigned n_pages, Trimmer trimmer) { span->span_size = span_end->span_size = n_pages; span->pool = nullptr; if (nr_free_pages < current_min_free_pages) { + drain_cross_cpu_freelist(); reclaim(); } return mem() + span_idx * page_size; @@ -422,8 +440,34 @@ size_t cpu_pages::object_size(void* ptr) { } } +void cpu_pages::free_cross_cpu(unsigned cpu_id, void* ptr) { + auto p = reinterpret_cast(ptr); + auto& list = all_cpus[cpu_id]->xcpu_freelist; + auto old = list.load(std::memory_order_relaxed); + do { + p->next = old; + } while (!list.compare_exchange_weak(old, p, std::memory_order_release, std::memory_order_relaxed)); + ++g_cross_cpu_frees; +} + +bool cpu_pages::drain_cross_cpu_freelist() { + if (!xcpu_freelist.load(std::memory_order_relaxed)) { + return false; + } + auto p = xcpu_freelist.exchange(nullptr, std::memory_order_acquire); + while (p) { + auto n = p->next; + free(p); + p = n; + } + return true; +} + void cpu_pages::free(void* ptr) { - assert(((reinterpret_cast(ptr) >> cpu_id_shift) & 0xff) == cpu_id); + auto obj_cpu = object_cpu_id(ptr); + if (obj_cpu != cpu_id) { + return free_cross_cpu(obj_cpu, ptr); + } page* span = to_page(ptr); if (span->pool) { span->pool->deallocate(ptr); @@ -433,7 +477,10 @@ void cpu_pages::free(void* ptr) { } void cpu_pages::free(void* ptr, size_t size) { - assert(((reinterpret_cast(ptr) >> cpu_id_shift) & 0xff) == cpu_id); + auto obj_cpu = object_cpu_id(ptr); + if (obj_cpu != cpu_id) { + return free_cross_cpu(obj_cpu, ptr); + } if (size <= max_small_allocation) { auto pool = &small_pools[small_pool::size_to_idx(size)]; pool->deallocate(ptr); @@ -447,6 +494,8 @@ bool cpu_pages::initialize() { return false; } cpu_id = cpu_id_gen.fetch_add(1, std::memory_order_relaxed); + assert(cpu_id < max_cpus); + all_cpus[cpu_id] = this; auto base = mem_base() + (size_t(cpu_id) << cpu_id_shift); auto size = 32 << 20; // Small size for bootstrap auto r = ::mmap(base, size, @@ -680,7 +729,7 @@ void free_large(void* ptr) { } size_t object_size(void* ptr) { - return cpu_mem.object_size(ptr); + return cpu_pages::all_cpus[object_cpu_id(ptr)]->object_size(ptr); } void* allocate(size_t size) { @@ -764,7 +813,11 @@ void configure(std::vector m, } statistics stats() { - return statistics{g_allocs, g_frees}; + return statistics{g_allocs, g_frees, g_cross_cpu_frees}; +} + +bool drain_cross_cpu_freelist() { + return cpu_mem.drain_cross_cpu_freelist(); } } @@ -1006,7 +1059,11 @@ void configure(std::vector m, std::experimental::optional Date: Thu, 15 Jan 2015 12:11:03 +0200 Subject: [PATCH 2/5] reactor: periodically call into allocator to drain cross-cpu free items --- core/reactor.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/reactor.cc b/core/reactor.cc index 9e3f836609..8ccce7f703 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -625,6 +625,13 @@ reactor::register_collectd_metrics() { scollectd::make_typed(scollectd::data_type::DERIVE, [] { return memory::stats().frees(); }) ), + scollectd::add_polled_metric( + scollectd::type_instance_id("memory", + scollectd::per_cpu_plugin_instance, + "total_operations", "cross_cpu_free"), + scollectd::make_typed(scollectd::data_type::DERIVE, + [] { return memory::stats().cross_cpu_frees(); }) + ), scollectd::add_polled_metric( scollectd::type_instance_id("memory", scollectd::per_cpu_plugin_instance, @@ -703,6 +710,10 @@ int reactor::run() { } ); + poller drain_cross_cpu_freelist([] { + return memory::drain_cross_cpu_freelist(); + }); + poller expire_lowres_timers([this] { if (_lowres_next_timeout == lowres_clock::time_point()) { return true; From 311e1c834e55e3a70c0d3bfa9d23044a8f883d7b Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 15 Jan 2015 14:22:13 +0200 Subject: [PATCH 3/5] future: fix future::then_wrapped() in exception case Calling state.get() will throw the exception instead of calling the function, thus denying the called function the chance to deal with the exception. Fix by constructing the future directly from state. --- core/future.hh | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/future.hh b/core/future.hh index e458f4c939..902e1632e2 100644 --- a/core/future.hh +++ b/core/future.hh @@ -353,6 +353,9 @@ private: future(exception_future_marker, std::exception_ptr ex) noexcept : _promise(nullptr) { _local_state.set_exception(std::move(ex)); } + explicit future(future_state&& state) noexcept + : _promise(nullptr), _local_state(std::move(state)) { + } future_state* state() noexcept { return _promise ? _promise->_state : &_local_state; } @@ -495,7 +498,7 @@ public: auto next_fut = pr.get_future(); _promise->schedule([func = std::forward(func), pr = std::move(pr)] (auto& state) mutable { try { - auto next_fut = func(future(ready_future_from_tuple_marker(), state.get())); + auto next_fut = func(future(std::move(state))); next_fut.forward_to(std::move(pr)); } catch (...) { pr.set_exception(std::current_exception()); From e746eb7f84d7fff3f5872bcd79dd557961b1616c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 15 Jan 2015 14:23:23 +0200 Subject: [PATCH 4/5] future: fix make_exception_future build error Add missing std::. --- core/future.hh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/future.hh b/core/future.hh index 902e1632e2..803986478e 100644 --- a/core/future.hh +++ b/core/future.hh @@ -646,7 +646,7 @@ future make_exception_future(std::exception_ptr ex) noexcept { template inline future make_exception_future(Exception&& ex) noexcept { - return make_exception_future(make_exception_ptr(std::forward(ex))); + return make_exception_future(std::make_exception_ptr(std::forward(ex))); } #endif /* FUTURE_HH_ */ From 4c3eb49aaf2dbfc984bee0caa03378696b119d4a Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Thu, 15 Jan 2015 14:24:31 +0200 Subject: [PATCH 5/5] tests: add smp test Test calling a function, and calling a function that returns an exception. The second test verifies that cross cpu free works. --- configure.py | 2 ++ test.py | 1 + tests/smp_test.cc | 61 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+) create mode 100644 tests/smp_test.cc diff --git a/configure.py b/configure.py index 61cff36565..df941147a0 100755 --- a/configure.py +++ b/configure.py @@ -72,6 +72,7 @@ tests = [ 'tests/timertest', 'tests/tcp_test', 'tests/futures_test', + 'tests/smp_test', 'tests/udp_server', 'tests/udp_client', 'tests/blkdiscard_test', @@ -194,6 +195,7 @@ deps = { 'tests/tcp_test': ['tests/tcp_test.cc'] + core + libnet, 'tests/timertest': ['tests/timertest.cc'] + core, 'tests/futures_test': ['tests/futures_test.cc'] + core, + 'tests/smp_test': ['tests/smp_test.cc'] + core, 'tests/udp_server': ['tests/udp_server.cc'] + core + libnet, 'tests/udp_client': ['tests/udp_client.cc'] + core + libnet, 'tests/tcp_server': ['tests/tcp_server.cc'] + core + libnet, diff --git a/test.py b/test.py index ca6f9325f4..4359a8198f 100755 --- a/test.py +++ b/test.py @@ -6,6 +6,7 @@ import subprocess all_tests = [ 'futures_test', + 'smp_test', 'memcached/test_ascii_parser', 'sstring_test', 'output_stream_test', diff --git a/tests/smp_test.cc b/tests/smp_test.cc new file mode 100644 index 0000000000..6234c23917 --- /dev/null +++ b/tests/smp_test.cc @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include "core/reactor.hh" +#include "core/app-template.hh" +#include "core/print.hh" + +future test_smp_call() { + return smp::submit_to(1, [] { + return make_ready_future(3); + }).then([] (int ret) { + return make_ready_future(ret == 3); + }); +} + +struct nasty_exception {}; + +future test_smp_exception() { + print("1\n"); + return smp::submit_to(1, [] { + print("2\n"); + auto x = make_exception_future(nasty_exception()); + print("3\n"); + return x; + }).then_wrapped([] (future result) { + print("4\n"); + try { + result.get(); + return make_ready_future(false); // expected an exception + } catch (nasty_exception&) { + // all is well + return make_ready_future(true); + } catch (...) { + // incorrect exception type + return make_ready_future(false); + } + }); +} + +int tests, fails; + +future<> +report(sstring msg, future&& result) { + return std::move(result).then([msg] (bool result) { + print("%s: %s\n", (result ? "PASS" : "FAIL"), msg); + tests += 1; + fails += !result; + }); +} + +int main(int ac, char** av) { + return app_template().run(ac, av, [] { + return report("smp call", test_smp_call()).then([] { + return report("smp exception", test_smp_exception()); + }).then([] { + print("\n%d tests / %d failures\n", tests, fails); + engine.exit(fails ? 1 : 0); + }); + }); +}