Merge branch 'cross-cpu-alloc'

Library-imposed smart pointers, such as std::exception_ptr, may free on
a cpu other than the one they were allocated on.  Unfortunately it is
impossible to wrap std::exception_ptr, since it interacts directly with the
stack unwinder.

Fix by allowing cross-cpu frees.  It's a slow path, so do not abuse.
This commit is contained in:
Avi Kivity
2015-01-15 15:42:26 +02:00
7 changed files with 152 additions and 9 deletions

View File

@@ -72,6 +72,7 @@ tests = [
'tests/timertest',
'tests/tcp_test',
'tests/futures_test',
'tests/smp_test',
'tests/udp_server',
'tests/udp_client',
'tests/blkdiscard_test',
@@ -194,6 +195,7 @@ deps = {
'tests/tcp_test': ['tests/tcp_test.cc'] + core + libnet,
'tests/timertest': ['tests/timertest.cc'] + core,
'tests/futures_test': ['tests/futures_test.cc'] + core,
'tests/smp_test': ['tests/smp_test.cc'] + core,
'tests/udp_server': ['tests/udp_server.cc'] + core + libnet,
'tests/udp_client': ['tests/udp_client.cc'] + core + libnet,
'tests/tcp_server': ['tests/tcp_server.cc'] + core + libnet,

View File

@@ -353,6 +353,9 @@ private:
future(exception_future_marker, std::exception_ptr ex) noexcept : _promise(nullptr) {
_local_state.set_exception(std::move(ex));
}
explicit future(future_state<T...>&& state) noexcept
: _promise(nullptr), _local_state(std::move(state)) {
}
future_state<T...>* state() noexcept {
return _promise ? _promise->_state : &_local_state;
}
@@ -495,7 +498,7 @@ public:
auto next_fut = pr.get_future();
_promise->schedule([func = std::forward<Func>(func), pr = std::move(pr)] (auto& state) mutable {
try {
auto next_fut = func(future(ready_future_from_tuple_marker(), state.get()));
auto next_fut = func(future(std::move(state)));
next_fut.forward_to(std::move(pr));
} catch (...) {
pr.set_exception(std::current_exception());
@@ -643,7 +646,7 @@ future<T...> make_exception_future(std::exception_ptr ex) noexcept {
template <typename... T, typename Exception>
inline
future<T...> make_exception_future(Exception&& ex) noexcept {
return make_exception_future<T...>(make_exception_ptr(std::forward<Exception>(ex)));
return make_exception_future<T...>(std::make_exception_ptr(std::forward<Exception>(ex)));
}
#endif /* FUTURE_HH_ */

View File

@@ -60,6 +60,8 @@ static constexpr const size_t page_bits = 12;
static constexpr const size_t page_size = 1 << page_bits;
static constexpr const size_t huge_page_size = 512 * page_size;
static constexpr const unsigned cpu_id_shift = 36; // FIXME: make dynamic
static constexpr const unsigned max_cpus = 256;
static constexpr const size_t cache_line_size = 64;
using pageidx = uint32_t;
@@ -68,6 +70,7 @@ class page_list;
static thread_local uint64_t g_allocs;
static thread_local uint64_t g_frees;
static thread_local uint64_t g_cross_cpu_frees;
using std::experimental::optional;
@@ -76,6 +79,11 @@ using allocate_system_memory_fn
namespace bi = boost::intrusive;
inline
unsigned object_cpu_id(void* ptr) {
return (reinterpret_cast<uintptr_t>(ptr) >> cpu_id_shift) & 0xff;
}
class page_list_link {
uint32_t _prev;
uint32_t _next;
@@ -226,6 +234,10 @@ public:
static constexpr const size_t max_small_allocation
= small_pool::idx_to_size(small_pool_array::nr_small_pools - 1);
struct cross_cpu_free_item {
cross_cpu_free_item* next;
};
struct cpu_pages {
static constexpr unsigned min_free_pages = 20000000 / page_size;
char* memory;
@@ -249,7 +261,9 @@ struct cpu_pages {
page_list free_spans[nr_span_lists]; // contains spans with span_size >= 2^idx
} fsu;
small_pool_array small_pools;
alignas(cache_line_size) std::atomic<cross_cpu_free_item*> xcpu_freelist;
static std::atomic<unsigned> cpu_id_gen;
static cpu_pages* all_cpus[max_cpus];
char* mem() { return memory; }
void link(page_list& list, page* span);
@@ -268,6 +282,8 @@ struct cpu_pages {
void* allocate_small(unsigned size);
void free(void* ptr);
void free(void* ptr, size_t size);
void free_cross_cpu(unsigned cpu_id, void* ptr);
bool drain_cross_cpu_freelist();
size_t object_size(void* ptr);
page* to_page(void* p) {
return &pages[(reinterpret_cast<char*>(p) - mem()) / page_size];
@@ -283,6 +299,7 @@ struct cpu_pages {
static thread_local cpu_pages cpu_mem;
std::atomic<unsigned> cpu_pages::cpu_id_gen;
cpu_pages* cpu_pages::all_cpus[max_cpus];
// Free spans are store in the largest index i such that nr_pages >= 1 << i.
static inline
@@ -379,6 +396,7 @@ cpu_pages::allocate_large_and_trim(unsigned n_pages, Trimmer trimmer) {
span->span_size = span_end->span_size = n_pages;
span->pool = nullptr;
if (nr_free_pages < current_min_free_pages) {
drain_cross_cpu_freelist();
reclaim();
}
return mem() + span_idx * page_size;
@@ -422,8 +440,34 @@ size_t cpu_pages::object_size(void* ptr) {
}
}
void cpu_pages::free_cross_cpu(unsigned cpu_id, void* ptr) {
auto p = reinterpret_cast<cross_cpu_free_item*>(ptr);
auto& list = all_cpus[cpu_id]->xcpu_freelist;
auto old = list.load(std::memory_order_relaxed);
do {
p->next = old;
} while (!list.compare_exchange_weak(old, p, std::memory_order_release, std::memory_order_relaxed));
++g_cross_cpu_frees;
}
bool cpu_pages::drain_cross_cpu_freelist() {
if (!xcpu_freelist.load(std::memory_order_relaxed)) {
return false;
}
auto p = xcpu_freelist.exchange(nullptr, std::memory_order_acquire);
while (p) {
auto n = p->next;
free(p);
p = n;
}
return true;
}
void cpu_pages::free(void* ptr) {
assert(((reinterpret_cast<uintptr_t>(ptr) >> cpu_id_shift) & 0xff) == cpu_id);
auto obj_cpu = object_cpu_id(ptr);
if (obj_cpu != cpu_id) {
return free_cross_cpu(obj_cpu, ptr);
}
page* span = to_page(ptr);
if (span->pool) {
span->pool->deallocate(ptr);
@@ -433,7 +477,10 @@ void cpu_pages::free(void* ptr) {
}
void cpu_pages::free(void* ptr, size_t size) {
assert(((reinterpret_cast<uintptr_t>(ptr) >> cpu_id_shift) & 0xff) == cpu_id);
auto obj_cpu = object_cpu_id(ptr);
if (obj_cpu != cpu_id) {
return free_cross_cpu(obj_cpu, ptr);
}
if (size <= max_small_allocation) {
auto pool = &small_pools[small_pool::size_to_idx(size)];
pool->deallocate(ptr);
@@ -447,6 +494,8 @@ bool cpu_pages::initialize() {
return false;
}
cpu_id = cpu_id_gen.fetch_add(1, std::memory_order_relaxed);
assert(cpu_id < max_cpus);
all_cpus[cpu_id] = this;
auto base = mem_base() + (size_t(cpu_id) << cpu_id_shift);
auto size = 32 << 20; // Small size for bootstrap
auto r = ::mmap(base, size,
@@ -680,7 +729,7 @@ void free_large(void* ptr) {
}
size_t object_size(void* ptr) {
return cpu_mem.object_size(ptr);
return cpu_pages::all_cpus[object_cpu_id(ptr)]->object_size(ptr);
}
void* allocate(size_t size) {
@@ -764,7 +813,11 @@ void configure(std::vector<resource::memory> m,
}
statistics stats() {
return statistics{g_allocs, g_frees};
return statistics{g_allocs, g_frees, g_cross_cpu_frees};
}
bool drain_cross_cpu_freelist() {
return cpu_mem.drain_cross_cpu_freelist();
}
}
@@ -1006,7 +1059,11 @@ void configure(std::vector<resource::memory> m, std::experimental::optional<std:
}
statistics stats() {
return statistics{0, 0};
return statistics{0, 0, 0};
}
bool drain_cross_cpu_freelist() {
return false;
}
}

View File

@@ -25,6 +25,12 @@ public:
void do_reclaim() { _reclaim(); }
};
// Call periodically to recycle objects that were freed
// on cpu other than the one they were allocated on.
//
// Returns @true if any work was actually performed.
bool drain_cross_cpu_freelist();
// We don't want the memory code calling back into the rest of
// the system, so allow the rest of the system to tell the memory
// code how to initiate reclaim.
@@ -40,12 +46,14 @@ statistics stats();
class statistics {
uint64_t _mallocs;
uint64_t _frees;
uint64_t _cross_cpu_frees;
private:
statistics(uint64_t mallocs, uint64_t frees)
: _mallocs(mallocs), _frees(frees) {}
statistics(uint64_t mallocs, uint64_t frees, uint64_t cross_cpu_frees)
: _mallocs(mallocs), _frees(frees), _cross_cpu_frees(cross_cpu_frees) {}
public:
uint64_t mallocs() const { return _mallocs; }
uint64_t frees() const { return _frees; }
uint64_t cross_cpu_frees() const { return _cross_cpu_frees; }
size_t live_objects() const { return mallocs() - frees(); }
friend statistics stats();
};

View File

@@ -625,6 +625,13 @@ reactor::register_collectd_metrics() {
scollectd::make_typed(scollectd::data_type::DERIVE,
[] { return memory::stats().frees(); })
),
scollectd::add_polled_metric(
scollectd::type_instance_id("memory",
scollectd::per_cpu_plugin_instance,
"total_operations", "cross_cpu_free"),
scollectd::make_typed(scollectd::data_type::DERIVE,
[] { return memory::stats().cross_cpu_frees(); })
),
scollectd::add_polled_metric(
scollectd::type_instance_id("memory",
scollectd::per_cpu_plugin_instance,
@@ -703,6 +710,10 @@ int reactor::run() {
}
);
poller drain_cross_cpu_freelist([] {
return memory::drain_cross_cpu_freelist();
});
poller expire_lowres_timers([this] {
if (_lowres_next_timeout == lowres_clock::time_point()) {
return true;

View File

@@ -6,6 +6,7 @@ import subprocess
all_tests = [
'futures_test',
'smp_test',
'memcached/test_ascii_parser',
'sstring_test',
'output_stream_test',

61
tests/smp_test.cc Normal file
View File

@@ -0,0 +1,61 @@
/*
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include "core/reactor.hh"
#include "core/app-template.hh"
#include "core/print.hh"
future<bool> test_smp_call() {
return smp::submit_to(1, [] {
return make_ready_future<int>(3);
}).then([] (int ret) {
return make_ready_future<bool>(ret == 3);
});
}
struct nasty_exception {};
future<bool> test_smp_exception() {
print("1\n");
return smp::submit_to(1, [] {
print("2\n");
auto x = make_exception_future<int>(nasty_exception());
print("3\n");
return x;
}).then_wrapped([] (future<int> result) {
print("4\n");
try {
result.get();
return make_ready_future<bool>(false); // expected an exception
} catch (nasty_exception&) {
// all is well
return make_ready_future<bool>(true);
} catch (...) {
// incorrect exception type
return make_ready_future<bool>(false);
}
});
}
int tests, fails;
future<>
report(sstring msg, future<bool>&& result) {
return std::move(result).then([msg] (bool result) {
print("%s: %s\n", (result ? "PASS" : "FAIL"), msg);
tests += 1;
fails += !result;
});
}
int main(int ac, char** av) {
return app_template().run(ac, av, [] {
return report("smp call", test_smp_call()).then([] {
return report("smp exception", test_smp_exception());
}).then([] {
print("\n%d tests / %d failures\n", tests, fails);
engine.exit(fails ? 1 : 0);
});
});
}