diff --git a/configure.py b/configure.py index 61cff36565..df941147a0 100755 --- a/configure.py +++ b/configure.py @@ -72,6 +72,7 @@ tests = [ 'tests/timertest', 'tests/tcp_test', 'tests/futures_test', + 'tests/smp_test', 'tests/udp_server', 'tests/udp_client', 'tests/blkdiscard_test', @@ -194,6 +195,7 @@ deps = { 'tests/tcp_test': ['tests/tcp_test.cc'] + core + libnet, 'tests/timertest': ['tests/timertest.cc'] + core, 'tests/futures_test': ['tests/futures_test.cc'] + core, + 'tests/smp_test': ['tests/smp_test.cc'] + core, 'tests/udp_server': ['tests/udp_server.cc'] + core + libnet, 'tests/udp_client': ['tests/udp_client.cc'] + core + libnet, 'tests/tcp_server': ['tests/tcp_server.cc'] + core + libnet, diff --git a/core/future.hh b/core/future.hh index e458f4c939..803986478e 100644 --- a/core/future.hh +++ b/core/future.hh @@ -353,6 +353,9 @@ private: future(exception_future_marker, std::exception_ptr ex) noexcept : _promise(nullptr) { _local_state.set_exception(std::move(ex)); } + explicit future(future_state&& state) noexcept + : _promise(nullptr), _local_state(std::move(state)) { + } future_state* state() noexcept { return _promise ? _promise->_state : &_local_state; } @@ -495,7 +498,7 @@ public: auto next_fut = pr.get_future(); _promise->schedule([func = std::forward(func), pr = std::move(pr)] (auto& state) mutable { try { - auto next_fut = func(future(ready_future_from_tuple_marker(), state.get())); + auto next_fut = func(future(std::move(state))); next_fut.forward_to(std::move(pr)); } catch (...) { pr.set_exception(std::current_exception()); @@ -643,7 +646,7 @@ future make_exception_future(std::exception_ptr ex) noexcept { template inline future make_exception_future(Exception&& ex) noexcept { - return make_exception_future(make_exception_ptr(std::forward(ex))); + return make_exception_future(std::make_exception_ptr(std::forward(ex))); } #endif /* FUTURE_HH_ */ diff --git a/core/memory.cc b/core/memory.cc index 2ca58dbc23..0b7d55428a 100644 --- a/core/memory.cc +++ b/core/memory.cc @@ -60,6 +60,8 @@ static constexpr const size_t page_bits = 12; static constexpr const size_t page_size = 1 << page_bits; static constexpr const size_t huge_page_size = 512 * page_size; static constexpr const unsigned cpu_id_shift = 36; // FIXME: make dynamic +static constexpr const unsigned max_cpus = 256; +static constexpr const size_t cache_line_size = 64; using pageidx = uint32_t; @@ -68,6 +70,7 @@ class page_list; static thread_local uint64_t g_allocs; static thread_local uint64_t g_frees; +static thread_local uint64_t g_cross_cpu_frees; using std::experimental::optional; @@ -76,6 +79,11 @@ using allocate_system_memory_fn namespace bi = boost::intrusive; +inline +unsigned object_cpu_id(void* ptr) { + return (reinterpret_cast(ptr) >> cpu_id_shift) & 0xff; +} + class page_list_link { uint32_t _prev; uint32_t _next; @@ -226,6 +234,10 @@ public: static constexpr const size_t max_small_allocation = small_pool::idx_to_size(small_pool_array::nr_small_pools - 1); +struct cross_cpu_free_item { + cross_cpu_free_item* next; +}; + struct cpu_pages { static constexpr unsigned min_free_pages = 20000000 / page_size; char* memory; @@ -249,7 +261,9 @@ struct cpu_pages { page_list free_spans[nr_span_lists]; // contains spans with span_size >= 2^idx } fsu; small_pool_array small_pools; + alignas(cache_line_size) std::atomic xcpu_freelist; static std::atomic cpu_id_gen; + static cpu_pages* all_cpus[max_cpus]; char* mem() { return memory; } void link(page_list& list, page* span); @@ -268,6 +282,8 @@ struct cpu_pages { void* allocate_small(unsigned size); void free(void* ptr); void free(void* ptr, size_t size); + void free_cross_cpu(unsigned cpu_id, void* ptr); + bool drain_cross_cpu_freelist(); size_t object_size(void* ptr); page* to_page(void* p) { return &pages[(reinterpret_cast(p) - mem()) / page_size]; @@ -283,6 +299,7 @@ struct cpu_pages { static thread_local cpu_pages cpu_mem; std::atomic cpu_pages::cpu_id_gen; +cpu_pages* cpu_pages::all_cpus[max_cpus]; // Free spans are store in the largest index i such that nr_pages >= 1 << i. static inline @@ -379,6 +396,7 @@ cpu_pages::allocate_large_and_trim(unsigned n_pages, Trimmer trimmer) { span->span_size = span_end->span_size = n_pages; span->pool = nullptr; if (nr_free_pages < current_min_free_pages) { + drain_cross_cpu_freelist(); reclaim(); } return mem() + span_idx * page_size; @@ -422,8 +440,34 @@ size_t cpu_pages::object_size(void* ptr) { } } +void cpu_pages::free_cross_cpu(unsigned cpu_id, void* ptr) { + auto p = reinterpret_cast(ptr); + auto& list = all_cpus[cpu_id]->xcpu_freelist; + auto old = list.load(std::memory_order_relaxed); + do { + p->next = old; + } while (!list.compare_exchange_weak(old, p, std::memory_order_release, std::memory_order_relaxed)); + ++g_cross_cpu_frees; +} + +bool cpu_pages::drain_cross_cpu_freelist() { + if (!xcpu_freelist.load(std::memory_order_relaxed)) { + return false; + } + auto p = xcpu_freelist.exchange(nullptr, std::memory_order_acquire); + while (p) { + auto n = p->next; + free(p); + p = n; + } + return true; +} + void cpu_pages::free(void* ptr) { - assert(((reinterpret_cast(ptr) >> cpu_id_shift) & 0xff) == cpu_id); + auto obj_cpu = object_cpu_id(ptr); + if (obj_cpu != cpu_id) { + return free_cross_cpu(obj_cpu, ptr); + } page* span = to_page(ptr); if (span->pool) { span->pool->deallocate(ptr); @@ -433,7 +477,10 @@ void cpu_pages::free(void* ptr) { } void cpu_pages::free(void* ptr, size_t size) { - assert(((reinterpret_cast(ptr) >> cpu_id_shift) & 0xff) == cpu_id); + auto obj_cpu = object_cpu_id(ptr); + if (obj_cpu != cpu_id) { + return free_cross_cpu(obj_cpu, ptr); + } if (size <= max_small_allocation) { auto pool = &small_pools[small_pool::size_to_idx(size)]; pool->deallocate(ptr); @@ -447,6 +494,8 @@ bool cpu_pages::initialize() { return false; } cpu_id = cpu_id_gen.fetch_add(1, std::memory_order_relaxed); + assert(cpu_id < max_cpus); + all_cpus[cpu_id] = this; auto base = mem_base() + (size_t(cpu_id) << cpu_id_shift); auto size = 32 << 20; // Small size for bootstrap auto r = ::mmap(base, size, @@ -680,7 +729,7 @@ void free_large(void* ptr) { } size_t object_size(void* ptr) { - return cpu_mem.object_size(ptr); + return cpu_pages::all_cpus[object_cpu_id(ptr)]->object_size(ptr); } void* allocate(size_t size) { @@ -764,7 +813,11 @@ void configure(std::vector m, } statistics stats() { - return statistics{g_allocs, g_frees}; + return statistics{g_allocs, g_frees, g_cross_cpu_frees}; +} + +bool drain_cross_cpu_freelist() { + return cpu_mem.drain_cross_cpu_freelist(); } } @@ -1006,7 +1059,11 @@ void configure(std::vector m, std::experimental::optional test_smp_call() { + return smp::submit_to(1, [] { + return make_ready_future(3); + }).then([] (int ret) { + return make_ready_future(ret == 3); + }); +} + +struct nasty_exception {}; + +future test_smp_exception() { + print("1\n"); + return smp::submit_to(1, [] { + print("2\n"); + auto x = make_exception_future(nasty_exception()); + print("3\n"); + return x; + }).then_wrapped([] (future result) { + print("4\n"); + try { + result.get(); + return make_ready_future(false); // expected an exception + } catch (nasty_exception&) { + // all is well + return make_ready_future(true); + } catch (...) { + // incorrect exception type + return make_ready_future(false); + } + }); +} + +int tests, fails; + +future<> +report(sstring msg, future&& result) { + return std::move(result).then([msg] (bool result) { + print("%s: %s\n", (result ? "PASS" : "FAIL"), msg); + tests += 1; + fails += !result; + }); +} + +int main(int ac, char** av) { + return app_template().run(ac, av, [] { + return report("smp call", test_smp_call()).then([] { + return report("smp exception", test_smp_exception()); + }).then([] { + print("\n%d tests / %d failures\n", tests, fails); + engine.exit(fails ? 1 : 0); + }); + }); +}