From 85b62d813205bc22b69fdbdef028ecc0f9a28676 Mon Sep 17 00:00:00 2001 From: Vlad Zolotarov Date: Sun, 25 Jan 2015 13:19:05 +0200 Subject: [PATCH 1/8] memory: hugetlbfs mapping may not be invalid Turn a condition into an assert() since if a mapping is invalid this may only mean that we have a bug. Signed-off-by: Vlad Zolotarov --- core/memory.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/memory.cc b/core/memory.cc index 9b6a16558d..1c931d1871 100644 --- a/core/memory.cc +++ b/core/memory.cc @@ -567,9 +567,8 @@ void cpu_pages::init_virt_to_phys_map() { auto phys = std::numeric_limits::max(); auto pfn = reinterpret_cast(mem() + i * huge_page_size) / page_size; fd.pread(&entry, 8, pfn * 8); - if (entry & 0x8000'0000'0000'0000) { - phys = (entry & 0x007f'ffff'ffff'ffff) << page_bits; - } + assert(entry & 0x8000'0000'0000'0000); + phys = (entry & 0x007f'ffff'ffff'ffff) << page_bits; virt_to_phys_map[i] = phys; } } From b9554219dc977202f6880927c35d84122e98cd48 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 25 Jan 2015 14:35:27 +0200 Subject: [PATCH 2/8] core: add prefetch functions --- core/prefetch.hh | 97 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 core/prefetch.hh diff --git a/core/prefetch.hh b/core/prefetch.hh new file mode 100644 index 0000000000..cc727f1273 --- /dev/null +++ b/core/prefetch.hh @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#ifndef PREFETCH_HH_ +#define PREFETCH_HH_ + +#include +#include +#include +#include "align.hh" + +static constexpr size_t cacheline_size = 64; +template +struct prefetcher; + +template +struct prefetcher<0, RW, LOC> { + prefetcher(uintptr_t ptr) {} +}; + +template +struct prefetcher { + prefetcher(uintptr_t ptr) { + __builtin_prefetch(reinterpret_cast(ptr), RW, LOC); + std::atomic_signal_fence(std::memory_order_seq_cst); + prefetcher(ptr + 64); + } +}; + +// LOC is a locality from __buitin_prefetch() gcc documentation: +// "The value locality must be a compile-time constant integer between zero and three. A value of +// zero means that the data has no temporal locality, so it need not be left in the cache after +// the access. A value of three means that the data has a high degree of temporal locality and +// should be left in all levels of cache possible. Values of one and two mean, respectively, a +// low or moderate degree of temporal locality. The default is three." +template +void prefetch(T* ptr) { + prefetcher(reinterpret_cast(ptr)); +} + +template +void prefetch(Iterator begin, Iterator end) { + std::for_each(begin, end, [] (auto v) { prefetch(v); }); +} + +template +void prefetch_n(T** pptr) { + boost::mpl::for_each< boost::mpl::range_c >( [pptr] (size_t x) { prefetch(*(pptr + x)); } ); +} + +template +void prefetch(void* ptr) { + prefetcher(reinterpret_cast(ptr)); +} + +template +void prefetch_n(Iterator begin, Iterator end) { + std::for_each(begin, end, [] (auto v) { prefetch(v); }); +} + +template +void prefetch_n(T** pptr) { + boost::mpl::for_each< boost::mpl::range_c >( [pptr] (size_t x) { prefetch(*(pptr + x)); } ); +} + +template +void prefetchw(T* ptr) { + prefetcher(reinterpret_cast(ptr)); +} + +template +void prefetchw_n(Iterator begin, Iterator end) { + std::for_each(begin, end, [] (auto v) { prefetchw(v); }); +} + +template +void prefetchw_n(T** pptr) { + boost::mpl::for_each< boost::mpl::range_c >( [pptr] (size_t x) { prefetchw(*(pptr + x)); } ); +} + +template +void prefetchw(void* ptr) { + prefetcher(reinterpret_cast(ptr)); +} + +template +void prefetchw_n(Iterator begin, Iterator end) { + std::for_each(begin, end, [] (auto v) { prefetchw(v); }); +} + +template +void prefetchw_n(T** pptr) { + boost::mpl::for_each< boost::mpl::range_c >( [pptr] (size_t x) { prefetchw(*(pptr + x)); } ); +} + +#endif From ff4aca2ee0787b98d64090546adb63ef23b4dc7d Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 25 Jan 2015 14:35:28 +0200 Subject: [PATCH 3/8] core: prefetch work items before processing --- core/reactor.cc | 39 +++++++++++++++++++++++++++------------ core/reactor.hh | 3 +++ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index c2401e6af2..8ca454c70f 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -24,6 +24,7 @@ #include #include #endif +#include "prefetch.hh" #ifdef HAVE_OSV #include @@ -975,16 +976,33 @@ void smp_message_queue::flush_response_batch() { _completed_fifo.clear(); } -size_t smp_message_queue::process_completions() { +template +size_t smp_message_queue::process_queue(lf_queue& q, Func process) { // copy batch to local memory in order to minimize // time in which cross-cpu data is accessed - work_item* items[queue_length]; - auto nr = _completed.pop(items); - for (unsigned i = 0; i < nr; ++i) { - items[i]->complete(); - delete items[i]; - } + work_item* items[queue_length + prefetch_cnt]; + work_item* wi; + if (!q.pop(wi)) + return 0; + // start prefecthing first item before popping the rest to overlap memory + // access with potential cache miss the second pop may cause + prefetch<2>(wi); + auto nr = q.pop(items); + std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nullptr); + unsigned i = 0; + do { + prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt); + process(wi); + wi = items[i++]; + } while(i <= nr); + return nr + 1; +} + +size_t smp_message_queue::process_completions() { + auto nr = process_queue(_completed, [] (work_item* wi) { + wi->complete(); + }); _current_queue_length -= nr; _compl += nr; _last_cmpl_batch = nr; @@ -997,14 +1015,11 @@ void smp_message_queue::flush_request_batch() { } size_t smp_message_queue::process_incoming() { - work_item* items[queue_length]; - auto nr = _pending.pop(items); - for (unsigned i = 0; i < nr; ++i) { - auto wi = items[i]; + auto nr = process_queue(_pending, [this] (work_item* wi) { wi->process().then([this, wi] { respond(wi); }); - } + }); _received += nr; _last_rcv_batch = nr; return nr; diff --git a/core/reactor.hh b/core/reactor.hh index 4163f1deab..25477b97a8 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -385,6 +385,7 @@ private: class smp_message_queue { static constexpr size_t queue_length = 128; static constexpr size_t batch_size = 16; + static constexpr size_t prefetch_cnt = 2; struct work_item; using lf_queue = boost::lockfree::spsc_queue>; @@ -459,6 +460,8 @@ public: return fut; } void start(unsigned cpuid); + template + size_t process_queue(lf_queue& q, Func process); size_t process_incoming(); size_t process_completions(); private: From 0383459b939122cb84b0f6fbc77ee83abb75cc1c Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 25 Jan 2015 17:36:41 +0200 Subject: [PATCH 4/8] core: prefetch only valid addresses Prefethcing non mapped address incurs address translation cost. --- core/reactor.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/reactor.cc b/core/reactor.cc index 8ca454c70f..3b4aef5fb2 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -988,7 +988,7 @@ size_t smp_message_queue::process_queue(lf_queue& q, Func process) { // access with potential cache miss the second pop may cause prefetch<2>(wi); auto nr = q.pop(items); - std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nullptr); + std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nr ? items[nr - 1] : wi); unsigned i = 0; do { prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt); From 74f9f1fdd2c141496bbe00d718809e61dad3013a Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Sun, 25 Jan 2015 17:36:42 +0200 Subject: [PATCH 5/8] core: prefetch different amount of work items for different queues Incoming item processing usually takes more work then completion item processing. Prefetch more completion items to make sure they are ready before access. --- core/reactor.cc | 12 ++++++------ core/reactor.hh | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 3b4aef5fb2..9a4f759dcc 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -976,11 +976,11 @@ void smp_message_queue::flush_response_batch() { _completed_fifo.clear(); } -template +template size_t smp_message_queue::process_queue(lf_queue& q, Func process) { // copy batch to local memory in order to minimize // time in which cross-cpu data is accessed - work_item* items[queue_length + prefetch_cnt]; + work_item* items[queue_length + PrefetchCnt]; work_item* wi; if (!q.pop(wi)) return 0; @@ -988,10 +988,10 @@ size_t smp_message_queue::process_queue(lf_queue& q, Func process) { // access with potential cache miss the second pop may cause prefetch<2>(wi); auto nr = q.pop(items); - std::fill(std::begin(items) + nr, std::begin(items) + nr + prefetch_cnt, nr ? items[nr - 1] : wi); + std::fill(std::begin(items) + nr, std::begin(items) + nr + PrefetchCnt, nr ? items[nr - 1] : wi); unsigned i = 0; do { - prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + prefetch_cnt); + prefetch_n<2>(std::begin(items) + i, std::begin(items) + i + PrefetchCnt); process(wi); wi = items[i++]; } while(i <= nr); @@ -1000,7 +1000,7 @@ size_t smp_message_queue::process_queue(lf_queue& q, Func process) { } size_t smp_message_queue::process_completions() { - auto nr = process_queue(_completed, [] (work_item* wi) { + auto nr = process_queue(_completed, [] (work_item* wi) { wi->complete(); }); _current_queue_length -= nr; @@ -1015,7 +1015,7 @@ void smp_message_queue::flush_request_batch() { } size_t smp_message_queue::process_incoming() { - auto nr = process_queue(_pending, [this] (work_item* wi) { + auto nr = process_queue(_pending, [this] (work_item* wi) { wi->process().then([this, wi] { respond(wi); }); diff --git a/core/reactor.hh b/core/reactor.hh index 25477b97a8..0d10ebd0af 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -460,7 +460,7 @@ public: return fut; } void start(unsigned cpuid); - template + template size_t process_queue(lf_queue& q, Func process); size_t process_incoming(); size_t process_completions(); From 18d212b04e566cd9099486faf220277d39e24cf4 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 27 Jan 2015 12:25:28 +0200 Subject: [PATCH 6/8] core: do not use separate thread_local variable to track pending signals Access to thread_local variable goes throw a helper function. --- core/reactor.cc | 11 +++++------ core/reactor.hh | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index 9a4f759dcc..8cefdac565 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -102,7 +102,8 @@ reactor::reactor() , _cpu_started(0) , _io_context(0) , _io_context_available(max_aio) - , _reuseport(posix_reuseport_detect()) { + , _reuseport(posix_reuseport_detect()) + , _pending_signals(0) { auto r = ::io_setup(max_aio, &_io_context); assert(r >= 0); struct sigevent sev; @@ -556,16 +557,14 @@ reactor::receive_signal(int signo) { return sh._promise.get_future(); } -thread_local std::atomic reactor::signal_handler::pending; - void sigaction(int signo, siginfo_t* siginfo, void* ignore) { - reactor::signal_handler::pending.fetch_or(1ull << signo, std::memory_order_relaxed); + engine._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); } bool reactor::poll_signal() { - auto signals = reactor::signal_handler::pending.load(std::memory_order_relaxed); + auto signals = _pending_signals.load(std::memory_order_relaxed); if (signals) { - reactor::signal_handler::pending.fetch_and(~signals, std::memory_order_relaxed); + _pending_signals.fetch_and(~signals, std::memory_order_relaxed); for (size_t i = 0; i < sizeof(signals)*8; i++) { if (signals & (1ull << i)) { _signal_handlers.at(i)._promise.set_value(); diff --git a/core/reactor.hh b/core/reactor.hh index 0d10ebd0af..ad181fb7c4 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -652,8 +652,8 @@ private: struct signal_handler { signal_handler(int signo); promise<> _promise; - static thread_local std::atomic pending; }; + std::atomic _pending_signals; std::unordered_map _signal_handlers; bool poll_signal(); friend void sigaction(int signo, siginfo_t* siginfo, void* ignore); From 7a92efe8d1f079e223e2b3243a6ecc44bd1878e2 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 27 Jan 2015 14:03:48 +0200 Subject: [PATCH 7/8] core: add local engine accessor function Do not use thread local engine variable directly, but use accessor instead. --- apps/httpd/httpd.cc | 2 +- apps/memcached/memcache.cc | 30 ++++++++-------- core/app-template.cc | 6 ++-- core/distributed.hh | 8 ++--- core/reactor.cc | 70 +++++++++++++++++++------------------- core/reactor.hh | 48 ++++++++++++++------------ core/scollectd.cc | 4 +-- core/xen/evtchn.cc | 2 +- net/dhcp.cc | 2 +- net/dpdk.cc | 2 +- net/ip.cc | 4 +-- net/native-stack.cc | 16 ++++----- net/net.cc | 18 +++++----- net/net.hh | 4 +-- net/posix-stack.cc | 14 ++++---- net/posix-stack.hh | 4 +-- net/proxy.cc | 2 +- net/tcp.hh | 2 +- net/virtio.cc | 2 +- tests/blkdiscard_test.cc | 4 +-- tests/directory_test.cc | 4 +-- tests/echotest.cc | 2 +- tests/fileiotest.cc | 4 +-- tests/ip_test.cc | 2 +- tests/l3_test.cc | 2 +- tests/smp_test.cc | 2 +- tests/tcp_client.cc | 2 +- tests/tcp_server.cc | 2 +- tests/tcp_test.cc | 4 +-- tests/test-reactor.cc | 4 +-- tests/test-utils.hh | 8 ++--- tests/udp_client.cc | 2 +- tests/udp_server.cc | 2 +- tests/udp_zero_copy.cc | 2 +- 34 files changed, 145 insertions(+), 141 deletions(-) diff --git a/apps/httpd/httpd.cc b/apps/httpd/httpd.cc index 0d9b06782d..a6ee4cb6e8 100644 --- a/apps/httpd/httpd.cc +++ b/apps/httpd/httpd.cc @@ -39,7 +39,7 @@ public: future<> listen(ipv4_addr addr) { listen_options lo; lo.reuse_address = true; - _listeners.push_back(engine.listen(make_ipv4_address(addr), lo)); + _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); do_accepts(_listeners.size() - 1); return make_ready_future<>(); } diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 1f147f8ed5..d7229f4175 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -1101,7 +1101,7 @@ public: } ss << histo[i] << "\n"; } - return {engine.cpu_id(), make_foreign(make_lw_shared(ss.str()))}; + return {engine().cpu_id(), make_foreign(make_lw_shared(ss.str()))}; } future<> stop() { return make_ready_future<>(); } @@ -1130,7 +1130,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future set(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().set(insertion)); } return _peers.invoke_on(cpu, &cache::remote_set, std::ref(insertion)); @@ -1139,7 +1139,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future add(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().add(insertion)); } return _peers.invoke_on(cpu, &cache::remote_add, std::ref(insertion)); @@ -1148,7 +1148,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future replace(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().replace(insertion)); } return _peers.invoke_on(cpu, &cache::remote_replace, std::ref(insertion)); @@ -1169,7 +1169,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future cas(item_insertion_data& insertion, typename item::version_type version) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().cas(insertion, version)); } return _peers.invoke_on(cpu, &cache::remote_cas, std::ref(insertion), std::move(version)); @@ -1182,7 +1182,7 @@ public: // The caller must keep @key live until the resulting future resolves. future, bool>> incr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future, bool>>( _peers.local().incr(key, delta)); } @@ -1192,7 +1192,7 @@ public: // The caller must keep @key live until the resulting future resolves. future, bool>> decr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future, bool>>( _peers.local().decr(key, delta)); } @@ -1658,7 +1658,7 @@ public: } void start() { - _chan = engine.net().make_udp_channel({_port}); + _chan = engine().net().make_udp_channel({_port}); keep_doing([this] { return _chan.receive().then([this](udp_datagram dgram) { packet& p = dgram.get_data(); @@ -1734,7 +1734,7 @@ public: void start() { listen_options lo; lo.reuse_address = true; - _listener = engine.listen(make_ipv4_address({_port}), lo); + _listener = engine().listen(make_ipv4_address({_port}), lo); keep_doing([this] { return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable { auto conn = make_lw_shared(std::move(fd), addr, _cache, _system_stats); @@ -1811,10 +1811,10 @@ int start_instance(int ac, char** av) { ; return app.run(ac, av, [&] { - engine.at_exit([&] { return tcp_server.stop(); }); - engine.at_exit([&] { return udp_server.stop(); }); - engine.at_exit([&] { return cache_peers.stop(); }); - engine.at_exit([&] { return system_stats.stop(); }); + engine().at_exit([&] { return tcp_server.stop(); }); + engine().at_exit([&] { return udp_server.stop(); }); + engine().at_exit([&] { return cache_peers.stop(); }); + engine().at_exit([&] { return system_stats.stop(); }); auto&& config = app.configuration(); return cache_peers.start().then([&system_stats] { @@ -1822,7 +1822,7 @@ int start_instance(int ac, char** av) { }).then([&] { if (WithFlashCache) { auto device_path = config["device"].as(); - return engine.open_file_dma(device_path).then([&] (file f) { + return engine().open_file_dma(device_path).then([&] (file f) { auto dev = make_lw_shared({std::move(f)}); return dev->f().stat().then([&, dev] (struct stat st) mutable { assert(S_ISBLK(st.st_mode)); @@ -1849,7 +1849,7 @@ int start_instance(int ac, char** av) { }).then([&tcp_server] { return tcp_server.invoke_on_all(&memcache::tcp_server::start); }).then([&] { - if (engine.net().has_per_core_namespace()) { + if (engine().net().has_per_core_namespace()) { return udp_server.start(std::ref(cache), std::ref(system_stats)); } else { return udp_server.start_single(std::ref(cache), std::ref(system_stats)); diff --git a/core/app-template.cc b/core/app-template.cc index 1b09daa5b2..2669e71ac8 100644 --- a/core/app-template.cc +++ b/core/app-template.cc @@ -58,7 +58,7 @@ app_template::run(int ac, char ** av, std::function&& func) { } smp::configure(configuration); _configuration = {std::move(configuration)}; - engine.when_started().then([this] { + engine().when_started().then([this] { scollectd::configure( this->configuration()); }).then( std::move(func) @@ -67,10 +67,10 @@ app_template::run(int ac, char ** av, std::function&& func) { get_ex(); } catch (std::exception& ex) { std::cout << "program failed with uncaught exception: " << ex.what() << "\n"; - engine.exit(1); + engine().exit(1); } }); - return engine.run(); + return engine().run(); } diff --git a/core/distributed.hh b/core/distributed.hh index 15b1e22d61..7949d2f597 100644 --- a/core/distributed.hh +++ b/core/distributed.hh @@ -177,7 +177,7 @@ distributed::invoke_on_all(void (Service::*func)(Args...), Args... args template Service& distributed::local() { - return *_instances[engine.cpu_id()]; + return *_instances[engine().cpu_id()]; } // Smart pointer wrapper which makes it safe to move across CPUs. @@ -190,19 +190,19 @@ private: unsigned _cpu; private: bool on_origin() { - return engine.cpu_id() == _cpu; + return engine().cpu_id() == _cpu; } public: using element_type = typename PtrType::element_type; foreign_ptr() : _value(PtrType()) - , _cpu(engine.cpu_id()) { + , _cpu(engine().cpu_id()) { } foreign_ptr(std::nullptr_t) : foreign_ptr() {} foreign_ptr(PtrType value) : _value(std::move(value)) - , _cpu(engine.cpu_id()) { + , _cpu(engine().cpu_id()) { } // The type is intentionally non-copyable because copies // are expensive because each copy requires across-CPU call. diff --git a/core/reactor.cc b/core/reactor.cc index 8cefdac565..ffe7f12dd8 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -153,7 +153,7 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, eevt.data.ptr = &pfd; int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt); assert(r == 0); - engine.start_epoll(); + engine().start_epoll(); } pfd.*pr = promise<>(); return (pfd.*pr).get_future(); @@ -273,7 +273,7 @@ bool reactor::process_io() future posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { - return engine.submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io([this, pos, buffer, len] (iocb& io) { io_prep_pwrite(&io, _fd, const_cast(buffer), len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -283,7 +283,7 @@ posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { future posix_file_impl::write_dma(uint64_t pos, std::vector iov) { - return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_pwritev(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -293,7 +293,7 @@ posix_file_impl::write_dma(uint64_t pos, std::vector iov) { future posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { - return engine.submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io([this, pos, buffer, len] (iocb& io) { io_prep_pread(&io, _fd, buffer, len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -303,7 +303,7 @@ posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { future posix_file_impl::read_dma(uint64_t pos, std::vector iov) { - return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_preadv(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -333,7 +333,7 @@ reactor::open_directory(sstring name) { future<> posix_file_impl::flush(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { return wrap_syscall(::fsync(_fd)); }).then([] (syscall_result sr) { sr.throw_if_error(); @@ -343,7 +343,7 @@ posix_file_impl::flush(void) { future posix_file_impl::stat(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { struct stat st; auto ret = ::fstat(_fd, &st); return wrap_syscall(ret, st); @@ -355,7 +355,7 @@ posix_file_impl::stat(void) { future<> posix_file_impl::discard(uint64_t offset, uint64_t length) { - return engine._thread_pool.submit>([this, offset, length] () mutable { + return engine()._thread_pool.submit>([this, offset, length] () mutable { return wrap_syscall(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, offset, length)); }).then([] (syscall_result sr) { @@ -366,7 +366,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) { future<> blockdev_file_impl::discard(uint64_t offset, uint64_t length) { - return engine._thread_pool.submit>([this, offset, length] () mutable { + return engine()._thread_pool.submit>([this, offset, length] () mutable { uint64_t range[2] { offset, length }; return wrap_syscall(::ioctl(_fd, BLKDISCARD, &range)); }).then([] (syscall_result sr) { @@ -384,7 +384,7 @@ posix_file_impl::size(void) { future blockdev_file_impl::size(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { size_t size; int ret = ::ioctl(_fd, BLKGETSIZE64, &size); return wrap_syscall(ret, size); @@ -431,7 +431,7 @@ posix_file_impl::list_directory(std::function (directory_entry de)> nex auto eofcond = [w] { return w->eof; }; return do_until(eofcond, [w, this] { if (w->current == w->total) { - return engine._thread_pool.submit>([w , this] () { + return engine()._thread_pool.submit>([w , this] () { auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast(w->buffer), sizeof(w->buffer)); return wrap_syscall(ret); }).then([w] (syscall_result ret) { @@ -524,13 +524,13 @@ future<> reactor::run_exit_tasks() { } void reactor::stop() { - assert(engine._id == 0); + assert(engine()._id == 0); run_exit_tasks().then([this] { auto sem = new semaphore(0); for (unsigned i = 1; i < smp::count; i++) { smp::submit_to<>(i, []() { - return engine.run_exit_tasks().then([] { - engine._stopped = true; + return engine().run_exit_tasks().then([] { + engine()._stopped = true; }); }).then([sem, i]() { sem->signal(); @@ -558,7 +558,7 @@ reactor::receive_signal(int signo) { } void sigaction(int signo, siginfo_t* siginfo, void* ignore) { - engine._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); + engine()._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); } bool reactor::poll_signal() { @@ -691,7 +691,7 @@ int reactor::run() { _network_stack = std::move(stack); for (unsigned c = 0; c < smp::count; c++) { smp::submit_to(c, [] { - engine._cpu_started.signal(); + engine()._cpu_started.signal(); }); } }); @@ -804,7 +804,7 @@ public: explicit registration_task(poller* p) : _p(p) {} virtual void run() noexcept override { if (_p) { - engine.register_poller(_p->_pollfn.get()); + engine().register_poller(_p->_pollfn.get()); _p->_registration_task = nullptr; } } @@ -822,7 +822,7 @@ private: public: explicit deregistration_task(std::unique_ptr&& p) : _p(std::move(p)) {} virtual void run() noexcept override { - engine.unregister_poller(_p.get()); + engine().unregister_poller(_p.get()); } }; @@ -862,7 +862,7 @@ reactor::poller::do_register() { // the poller instead. auto task = std::make_unique(this); auto tmp = task.get(); - engine.add_task(std::move(task)); + engine().add_task(std::move(task)); _registration_task = tmp; } @@ -883,8 +883,8 @@ reactor::poller::~poller() { auto dummy = make_pollfn([] { return false; }); auto dummy_p = dummy.get(); auto task = std::make_unique(std::move(dummy)); - engine.add_task(std::move(task)); - engine.replace_poller(_pollfn.get(), dummy_p); + engine().add_task(std::move(task)); + engine().replace_poller(_pollfn.get(), dummy_p); } } } @@ -965,7 +965,7 @@ void smp_message_queue::submit_item(smp_message_queue::work_item* item) { void smp_message_queue::respond(work_item* item) { _completed_fifo.push_back(item); - if (_completed_fifo.size() >= batch_size || engine._stopped) { + if (_completed_fifo.size() >= batch_size || engine()._stopped) { flush_response_batch(); } } @@ -1027,7 +1027,7 @@ size_t smp_message_queue::process_incoming() { void smp_message_queue::start(unsigned cpuid) { _tx.init(); char instance[10]; - std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid); + std::snprintf(instance, sizeof(instance), "%u-%u", engine().cpu_id(), cpuid); _collectd_regs = { // queue_length value:GAUGE:0:U // Absolute value of num packets in last tx batch. @@ -1076,7 +1076,7 @@ void smp_message_queue::start(unsigned cpuid) { #ifndef HAVE_OSV thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) { keep_doing([this] { - return engine.receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); + return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); }); } @@ -1132,7 +1132,7 @@ file_desc readable_eventfd::try_create_eventfd(size_t initial) { } future readable_eventfd::wait() { - return engine.readable(*_fd._s).then([this] { + return engine().readable(*_fd._s).then([this] { uint64_t count; int r = ::read(_fd.get_fd(), &count, sizeof(count)); assert(r == sizeof(count)); @@ -1141,7 +1141,7 @@ future readable_eventfd::wait() { } void schedule(std::unique_ptr t) { - engine.add_task(std::move(t)); + engine().add_task(std::move(t)); } bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) { @@ -1219,8 +1219,8 @@ unsigned smp::count = 1; void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { - if (c != engine.cpu_id()) { - _qs[c][engine.cpu_id()].start(c); + if (c != engine().cpu_id()) { + _qs[c][engine().cpu_id()].start(c); } } } @@ -1300,11 +1300,11 @@ void smp::configure(boost::program_options::variables_map configuration) sigfillset(&mask); auto r = ::sigprocmask(SIG_BLOCK, &mask, NULL); throw_system_error_on(r == -1); - engine._id = i; + engine()._id = i; start_all_queues(); inited.wait(); - engine.configure(configuration); - engine.run(); + engine().configure(configuration); + engine().run(); }); } @@ -1317,14 +1317,14 @@ void smp::configure(boost::program_options::variables_map configuration) start_all_queues(); inited.wait(); - engine.configure(configuration); - engine._lowres_clock = std::make_unique(); + engine().configure(configuration); + engine()._lowres_clock = std::make_unique(); } __thread size_t future_avail_count = 0; __thread size_t task_quota = 0; -thread_local reactor engine; +thread_local reactor local_engine; class reactor_notifier_epoll : public reactor_notifier { @@ -1361,7 +1361,7 @@ class reactor_notifier_osv : bool _needed = false; public: virtual future<> wait() override { - return engine.notified(this); + return engine().notified(this); } virtual void signal() override { wake(); diff --git a/core/reactor.hh b/core/reactor.hh index ad181fb7c4..0bde67064c 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -811,9 +811,13 @@ reactor::make_pollfn(Func&& func) { return std::make_unique(std::forward(func)); } -extern thread_local reactor engine; +extern thread_local reactor local_engine; extern __thread size_t task_quota; +inline reactor& engine() { + return local_engine; +} + class smp { #if HAVE_DPDK using thread_adaptor = std::function; @@ -837,10 +841,10 @@ public: template static std::result_of_t submit_to(unsigned t, Func func, std::enable_if_t::value, void*> = nullptr) { - if (t == engine.cpu_id()) { + if (t == engine().cpu_id()) { return func(); } else { - return _qs[t][engine.cpu_id()].submit(std::move(func)); + return _qs[t][engine().cpu_id()].submit(std::move(func)); } } template @@ -861,11 +865,11 @@ public: static bool poll_queues() { size_t got = 0; for (unsigned i = 0; i < count; i++) { - if (engine.cpu_id() != i) { - auto& rxq = _qs[engine.cpu_id()][i]; + if (engine().cpu_id() != i) { + auto& rxq = _qs[engine().cpu_id()][i]; rxq.flush_response_batch(); got += rxq.process_incoming(); - auto& txq = _qs[i][engine._id]; + auto& txq = _qs[i][engine()._id]; txq.flush_request_batch(); got += txq.process_completions(); } @@ -881,7 +885,7 @@ public: inline pollable_fd_state::~pollable_fd_state() { - engine.forget(*this); + engine().forget(*this); } class data_source_impl { @@ -1315,32 +1319,32 @@ output_stream::flush() { inline future pollable_fd::read_some(char* buffer, size_t size) { - return engine.read_some(*_s, buffer, size); + return engine().read_some(*_s, buffer, size); } inline future pollable_fd::read_some(uint8_t* buffer, size_t size) { - return engine.read_some(*_s, buffer, size); + return engine().read_some(*_s, buffer, size); } inline future pollable_fd::read_some(const std::vector& iov) { - return engine.read_some(*_s, iov); + return engine().read_some(*_s, iov); } inline future<> pollable_fd::write_all(const char* buffer, size_t size) { - return engine.write_all(*_s, buffer, size); + return engine().write_all(*_s, buffer, size); } inline future<> pollable_fd::write_all(const uint8_t* buffer, size_t size) { - return engine.write_all(*_s, buffer, size); + return engine().write_all(*_s, buffer, size); } inline future pollable_fd::write_some(net::packet& p) { - return engine.writeable(*_s).then([this, &p] () mutable { + return engine().writeable(*_s).then([this, &p] () mutable { static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) && sizeof(iovec::iov_base) == sizeof(net::fragment::base) && offsetof(iovec, iov_len) == offsetof(net::fragment, size) && @@ -1374,22 +1378,22 @@ future<> pollable_fd::write_all(net::packet& p) { inline future<> pollable_fd::readable() { - return engine.readable(*_s); + return engine().readable(*_s); } inline future<> pollable_fd::writeable() { - return engine.writeable(*_s); + return engine().writeable(*_s); } inline future pollable_fd::accept() { - return engine.accept(*_s); + return engine().accept(*_s); } inline future pollable_fd::recvmsg(struct msghdr *msg) { - return engine.readable(*_s).then([this, msg] { + return engine().readable(*_s).then([this, msg] { auto r = get_file_desc().recvmsg(msg, 0); if (!r) { return recvmsg(msg); @@ -1408,7 +1412,7 @@ future pollable_fd::recvmsg(struct msghdr *msg) { inline future pollable_fd::sendmsg(struct msghdr* msg) { - return engine.writeable(*_s).then([this, msg] () mutable { + return engine().writeable(*_s).then([this, msg] () mutable { auto r = get_file_desc().sendmsg(msg, 0); if (!r) { return sendmsg(msg); @@ -1425,7 +1429,7 @@ future pollable_fd::sendmsg(struct msghdr* msg) { inline future pollable_fd::sendto(socket_address addr, const void* buf, size_t len) { - return engine.writeable(*_s).then([this, buf, len, addr] () mutable { + return engine().writeable(*_s).then([this, buf, len, addr] () mutable { auto r = get_file_desc().sendto(addr, buf, len, 0); if (!r) { return sendto(std::move(addr), buf, len); @@ -1442,7 +1446,7 @@ template inline timer::~timer() { if (_queued) { - engine.del_timer(this); + engine().del_timer(this); } } @@ -1460,7 +1464,7 @@ void timer::arm(time_point until, boost::optional period) { _armed = true; _expired = false; _expiry = until; - engine.add_timer(this); + engine().add_timer(this); _queued = true; } @@ -1493,7 +1497,7 @@ bool timer::cancel() { } _armed = false; if (_queued) { - engine.del_timer(this); + engine().del_timer(this); _queued = false; } return true; diff --git a/core/scollectd.cc b/core/scollectd.cc index 4fb89e5428..c14519d551 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -93,7 +93,7 @@ public: _period = period; _addr = addr; _host = host; - _chan = engine.net().make_udp_channel(); + _chan = engine().net().make_udp_channel(); _timer.set_callback(std::bind(&impl::run, this)); // dogfood ourselves @@ -285,7 +285,7 @@ private: // Optional put_cached(part_type::PluginInst, id.plugin_instance() == per_cpu_plugin_instance ? - std::to_string(engine.cpu_id()) : id.plugin_instance()); + std::to_string(engine().cpu_id()) : id.plugin_instance()); put_cached(part_type::Type, id.type()); // Optional put_cached(part_type::TypeInst, id.type_instance()); diff --git a/core/xen/evtchn.cc b/core/xen/evtchn.cc index d696120d35..1f66ea375d 100644 --- a/core/xen/evtchn.cc +++ b/core/xen/evtchn.cc @@ -136,7 +136,7 @@ class kernel_evtchn: public evtchn { public: kernel_evtchn(unsigned otherend) : evtchn(otherend) - , _notified(engine.make_reactor_notifier()) {} + , _notified(engine().make_reactor_notifier()) {} virtual port bind() override; virtual void notify(int port) override; }; diff --git a/net/dhcp.cc b/net/dhcp.cc index 255cf80669..8933a43702 100644 --- a/net/dhcp.cc +++ b/net/dhcp.cc @@ -316,7 +316,7 @@ public: return make_ready_future<>(); } handled = true; - auto src_cpu = engine.cpu_id(); + auto src_cpu = engine().cpu_id(); if (src_cpu == 0) { return process_packet(std::move(p), dhp, opt_off); } diff --git a/net/dpdk.cc b/net/dpdk.cc index 2a946bdce8..8e39b88aeb 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -136,7 +136,7 @@ public: dpdk_device(uint8_t port_idx, uint16_t num_queues) : _port_idx(port_idx) , _num_queues(num_queues) - , _home_cpu(engine.cpu_id()) { + , _home_cpu(engine().cpu_id()) { /* now initialise the port we will use */ int ret = init_port_start(); diff --git a/net/ip.cc b/net/ip.cc index e9f3094822..d8e2783013 100644 --- a/net/ip.cc +++ b/net/ip.cc @@ -151,7 +151,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) { auto dropped_size = frag.mem_size; auto& ip_data = frag.data.map.begin()->second; // Choose a cpu to forward this packet - auto cpu_id = engine.cpu_id(); + auto cpu_id = engine().cpu_id(); auto l4 = _l4[h.ip_proto]; if (l4) { size_t l4_offset = 0; @@ -163,7 +163,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) { } // No need to forward if the dst cpu is the current cpu - if (cpu_id == engine.cpu_id()) { + if (cpu_id == engine().cpu_id()) { l4->received(std::move(ip_data), h.src_ip, h.dst_ip); } else { auto to = _netif->hw_address(); diff --git a/net/native-stack.cc b/net/native-stack.cc index 1bd542678b..2ffdd8f24a 100644 --- a/net/native-stack.cc +++ b/net/native-stack.cc @@ -77,7 +77,7 @@ void create_native_net_device(boost::program_options::variables_map opts) { std::shared_ptr sdev(dev.release()); for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [opts, sdev] { - uint16_t qid = engine.cpu_id(); + uint16_t qid = engine().cpu_id(); if (qid < sdev->hw_queues_count()) { auto qp = sdev->init_local_queue(opts, qid); std::map cpu_weights; @@ -130,7 +130,7 @@ public: virtual udp_channel make_udp_channel(ipv4_addr addr) override; virtual future<> initialize() override; static future> create(boost::program_options::variables_map opts) { - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { create_native_net_device(opts); } return ready_promise.get_future(); @@ -199,7 +199,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) { // Hijack the ip-stack. for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [d] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.set_ipv4_packet_filter(d->get_ipv4_filter()); }); } @@ -209,7 +209,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) { return fut.then([this, d, is_renew](bool success, const dhcp::lease & res) { for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.set_ipv4_packet_filter(nullptr); }); } @@ -228,12 +228,12 @@ void native_network_stack::on_dhcp(bool success, const dhcp::lease & res, bool i _config.set_value(); } - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { // And the other cpus, which, in the case of initial discovery, // will be waiting for us. for (unsigned i = 1; i < smp::count; i++) { smp::submit_to(i, [success, res, is_renew]() { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.on_dhcp(success, res, is_renew); }); } @@ -259,7 +259,7 @@ future<> native_network_stack::initialize() { // Only run actual discover on main cpu. // All other cpus must simply for main thread to complete and signal them. - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { run_dhcp(); } return _config.get_future(); @@ -270,7 +270,7 @@ void arp_learn(ethernet_address l2, ipv4_address l3) { for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [l2, l3] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.arp_learn(l2, l3); }); } diff --git a/net/net.cc b/net/net.cc index 3de9e7d7da..d266a4bb3a 100644 --- a/net/net.cc +++ b/net/net.cc @@ -77,7 +77,7 @@ qp::~qp() { void qp::configure_proxies(const std::map& cpu_weights) { assert(!cpu_weights.empty()); - if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) { + if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine().cpu_id())) { // special case queue sending to self only, to avoid requiring a hash value return; } @@ -113,15 +113,15 @@ void qp::build_sw_reta(const std::map& cpu_weights) { subscription device::receive(std::function (packet)> next_packet) { - auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet)); - _queues[engine.cpu_id()]->rx_start(); + auto sub = _queues[engine().cpu_id()]->_rx_stream.listen(std::move(next_packet)); + _queues[engine().cpu_id()]->rx_start(); return std::move(sub); } void device::set_local_queue(std::unique_ptr dev) { - assert(!_queues[engine.cpu_id()]); - _queues[engine.cpu_id()] = dev.get(); - engine.at_destroy([dev = std::move(dev)] {}); + assert(!_queues[engine().cpu_id()]); + _queues[engine().cpu_id()] = dev.get(); + engine().at_destroy([dev = std::move(dev)] {}); } @@ -181,7 +181,7 @@ void interface::forward(unsigned cpuid, packet p) { if (queue_depth < 1000) { queue_depth++; - auto src_cpu = engine.cpu_id(); + auto src_cpu = engine().cpu_id(); smp::submit_to(cpuid, [this, p = std::move(p), src_cpu]() mutable { _dev->l2receive(p.free_on_cpu(src_cpu)); }).then([] { @@ -196,7 +196,7 @@ future<> interface::dispatch_packet(packet p) { auto i = _proto_map.find(ntoh(eh->eth_proto)); if (i != _proto_map.end()) { l3_rx_stream& l3 = i->second; - auto fw = _dev->forward_dst(engine.cpu_id(), [&p, &l3] () { + auto fw = _dev->forward_dst(engine().cpu_id(), [&p, &l3] () { auto hwrss = p.rss_hash(); if (hwrss) { return hwrss.value(); @@ -208,7 +208,7 @@ future<> interface::dispatch_packet(packet p) { return 0u; } }); - if (fw != engine.cpu_id()) { + if (fw != engine().cpu_id()) { forward(fw, std::move(p)); } else { auto h = ntoh(*eh); diff --git a/net/net.hh b/net/net.hh index 401c166361..7016c3b49a 100644 --- a/net/net.hh +++ b/net/net.hh @@ -169,8 +169,8 @@ public: } virtual ~device() {}; qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; } - qp& local_queue() { return queue_for_cpu(engine.cpu_id()); } - void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); } + qp& local_queue() { return queue_for_cpu(engine().cpu_id()); } + void l2receive(packet p) { _queues[engine().cpu_id()]->_rx_stream.produce(std::move(p)); } subscription receive(std::function (packet)> next_packet); virtual ethernet_address hw_address() = 0; virtual net::hw_features hw_features() = 0; diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 7e565000c2..974df87d20 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -29,7 +29,7 @@ posix_server_socket_impl::accept() { static unsigned balance = 0; auto cpu = balance++ % smp::count; - if (cpu == engine.cpu_id()) { + if (cpu == engine().cpu_id()) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future( connected_socket(std::move(csi)), sa); @@ -126,14 +126,14 @@ posix_data_sink_impl::put(packet p) { server_socket posix_network_stack::listen(socket_address sa, listen_options opt) { if (_reuseport) - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); else - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); } future posix_network_stack::connect(socket_address sa) { - return engine.posix_connect(sa).then([] (pollable_fd fd) { + return engine().posix_connect(sa).then([] (pollable_fd fd) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future(connected_socket(std::move(csi))); }); @@ -145,14 +145,14 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl: server_socket posix_ap_network_stack::listen(socket_address sa, listen_options opt) { if (_reuseport) - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); else return server_socket(std::make_unique(sa)); } future posix_ap_network_stack::connect(socket_address sa) { - return engine.posix_connect(sa).then([] (pollable_fd fd) { + return engine().posix_connect(sa).then([] (pollable_fd fd) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future(connected_socket(std::move(csi))); }); @@ -221,7 +221,7 @@ public: auto sa = make_ipv4_address(bind_address); file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); fd.setsockopt(SOL_IP, IP_PKTINFO, true); - if (engine.posix_reuseport_available()) { + if (engine().posix_reuseport_available()) { fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); } fd.bind(sa.u.sa, sizeof(sa.u.sas)); diff --git a/net/posix-stack.hh b/net/posix-stack.hh index 01439d3c81..eb761eb3e2 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -71,7 +71,7 @@ class posix_network_stack : public network_stack { private: const bool _reuseport; public: - explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {} + explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine().posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; virtual net::udp_channel make_udp_channel(ipv4_addr addr) override; @@ -85,7 +85,7 @@ class posix_ap_network_stack : public posix_network_stack { private: const bool _reuseport; public: - posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {} + posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine().posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; static future> create(boost::program_options::variables_map opts) { diff --git a/net/proxy.cc b/net/proxy.cc index 22a025dfcc..b5171e606f 100644 --- a/net/proxy.cc +++ b/net/proxy.cc @@ -39,7 +39,7 @@ uint32_t proxy_net_device::send(circular_buffer& p) if (!_moving.empty()) { qp* dev = &_dev->queue_for_cpu(_cpu); - auto cpu = engine.cpu_id(); + auto cpu = engine().cpu_id(); smp::submit_to(_cpu, [this, dev, cpu]() mutable { for(size_t i = 0; i < _moving.size(); i++) { dev->proxy_send(_moving[i].free_on_cpu(cpu, [this] { _send_depth--; })); diff --git a/net/tcp.hh b/net/tcp.hh index 7be5520901..49e7f828bd 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -469,7 +469,7 @@ future::connection> tcp::connect(socket_add do { src_port = _port_dist(_e); id = connid{src_ip, dst_ip, src_port, dst_port}; - } while (_inet._inet.netif()->hash2cpu(id.hash()) != engine.cpu_id() + } while (_inet._inet.netif()->hash2cpu(id.hash()) != engine().cpu_id() || _tcbs.find(id) != _tcbs.end()); auto tcbp = make_lw_shared(*this, id); diff --git a/net/virtio.cc b/net/virtio.cc index a0b8fc7934..c86c89c2fa 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -912,7 +912,7 @@ qp_osv::qp_osv(osv::assigned_virtio &virtio, // Set up interrupts // FIXME: in OSv, the first thing we do in the handler is to call // _rqx.disable_interrupts(). Here in seastar, we only do it much later - // in the main engine. Probably needs to do it like in osv - in the beginning of the handler. + // in the main engine(). Probably needs to do it like in osv - in the beginning of the handler. _virtio.enable_interrupt( 0, [&] { _rxq.wake_notifier_wait(); } ); _virtio.enable_interrupt( diff --git a/tests/blkdiscard_test.cc b/tests/blkdiscard_test.cc index b5702fc649..92cadff242 100644 --- a/tests/blkdiscard_test.cc +++ b/tests/blkdiscard_test.cc @@ -25,7 +25,7 @@ int main(int ac, char** av) { auto&& config = app.configuration(); auto filepath = config["dev"].as(); - engine.open_file_dma(filepath).then([] (file f) { + engine().open_file_dma(filepath).then([] (file f) { auto ft = new file_test{std::move(f)}; ft->f.stat().then([ft] (struct stat st) mutable { @@ -42,7 +42,7 @@ int main(int ac, char** av) { }).then([ft] () mutable { std::cout << "done\n"; delete ft; - engine.exit(0); + engine().exit(0); }); }); }); diff --git a/tests/directory_test.cc b/tests/directory_test.cc index c87edeaa10..1f6e9129d7 100644 --- a/tests/directory_test.cc +++ b/tests/directory_test.cc @@ -25,11 +25,11 @@ int main(int ac, char** av) { } }; return app_template().run(ac, av, [] { - return engine.open_directory(".").then([] (file f) { + return engine().open_directory(".").then([] (file f) { auto l = make_lw_shared(std::move(f)); return l->done().then([l] { // ugly thing to keep *l alive - engine.exit(0); + engine().exit(0); }); }); }); diff --git a/tests/echotest.cc b/tests/echotest.cc index bc3f80c3f4..d1cbbc3480 100644 --- a/tests/echotest.cc +++ b/tests/echotest.cc @@ -101,7 +101,7 @@ int main(int ac, char** av) { dnet->receive([vnet, &rx] (packet p) { return echo_packet(*vnet, std::move(p)); }); - engine.run(); + engine().run(); return 0; } diff --git a/tests/fileiotest.cc b/tests/fileiotest.cc index baf3ca5800..ea59193417 100644 --- a/tests/fileiotest.cc +++ b/tests/fileiotest.cc @@ -14,7 +14,7 @@ struct file_test { int main(int ac, char** av) { static constexpr auto max = 10000; - engine.open_file_dma("testfile.tmp").then([] (file f) { + engine().open_file_dma("testfile.tmp").then([] (file f) { auto ft = new file_test{std::move(f)}; for (size_t i = 0; i < max; ++i) { ft->par.wait().then([ft, i] { @@ -45,7 +45,7 @@ int main(int ac, char** av) { ::exit(0); }); }); - engine.run(); + engine().run(); return 0; } diff --git a/tests/ip_test.cc b/tests/ip_test.cc index 53a8308acd..0bc4cf29e2 100644 --- a/tests/ip_test.cc +++ b/tests/ip_test.cc @@ -20,7 +20,7 @@ int main(int ac, char** av) { interface netif(std::move(vnet)); ipv4 inet(&netif); inet.set_host_address(ipv4_address("192.168.122.2")); - engine.run(); + engine().run(); return 0; } diff --git a/tests/l3_test.cc b/tests/l3_test.cc index 7dd989d7cd..21ced25c51 100644 --- a/tests/l3_test.cc +++ b/tests/l3_test.cc @@ -23,7 +23,7 @@ int main(int ac, char** av) { interface netif(std::move(vnet)); l3_protocol arp(&netif, eth_protocol_num::arp, []{ return std::experimental::optional(); }); dump_arp_packets(arp); - engine.run(); + engine().run(); return 0; } diff --git a/tests/smp_test.cc b/tests/smp_test.cc index 6234c23917..7734f63f58 100644 --- a/tests/smp_test.cc +++ b/tests/smp_test.cc @@ -55,7 +55,7 @@ int main(int ac, char** av) { return report("smp exception", test_smp_exception()); }).then([] { print("\n%d tests / %d failures\n", tests, fails); - engine.exit(fails ? 1 : 0); + engine().exit(fails ? 1 : 0); }); }); } diff --git a/tests/tcp_client.cc b/tests/tcp_client.cc index 0df1f93723..b55d6e077b 100644 --- a/tests/tcp_client.cc +++ b/tests/tcp_client.cc @@ -63,7 +63,7 @@ public: }; void start(ipv4_addr server_addr, std::string mode) { - engine.net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) { + engine().net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) { _sockets.push_back(std::move(fd)); auto conn = new connection(std::move(_sockets[0])); if (mode == "write") { diff --git a/tests/tcp_server.cc b/tests/tcp_server.cc index f226061cc6..92e279d7f6 100644 --- a/tests/tcp_server.cc +++ b/tests/tcp_server.cc @@ -26,7 +26,7 @@ public: future<> listen(ipv4_addr addr) { listen_options lo; lo.reuse_address = true; - _listeners.push_back(engine.listen(make_ipv4_address(addr), lo)); + _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); do_accepts(_listeners.size() - 1); return make_ready_future<>(); } diff --git a/tests/tcp_test.cc b/tests/tcp_test.cc index 2145f1a277..6f0ff968d4 100644 --- a/tests/tcp_test.cc +++ b/tests/tcp_test.cc @@ -46,8 +46,8 @@ int main(int ac, char** av) { ipv4 inet(&netif); inet.set_host_address(ipv4_address("192.168.122.2")); tcp_test tt(inet); - engine.when_started().then([&tt] { tt.run(); }); - engine.run(); + engine().when_started().then([&tt] { tt.run(); }); + engine().run(); } diff --git a/tests/test-reactor.cc b/tests/test-reactor.cc index 328aa9a992..2b4a280263 100644 --- a/tests/test-reactor.cc +++ b/tests/test-reactor.cc @@ -41,9 +41,9 @@ int main(int ac, char** av) ipv4_addr addr{10000}; listen_options lo; lo.reuse_address = true; - test t(engine.posix_listen(make_ipv4_address(addr), lo)); + test t(engine().posix_listen(make_ipv4_address(addr), lo)); t.start_accept(); - engine.run(); + engine().run(); return 0; } diff --git a/tests/test-utils.hh b/tests/test-utils.hh index e20343b5ea..415c505114 100644 --- a/tests/test-utils.hh +++ b/tests/test-utils.hh @@ -33,18 +33,18 @@ public: boost::program_options::variables_map configuration; auto opts = reactor::get_options_description(); bpo::store(bpo::command_line_parser(0, nullptr).options(opts).run(), configuration); - engine.configure(configuration); - engine.when_started().then([this] { + engine().configure(configuration); + engine().when_started().then([this] { return run_test_case(); }).rescue([] (auto get) { try { get(); - engine.exit(0); + engine().exit(0); } catch (...) { std::terminate(); } }); - engine.run(); + engine().run(); }); t.join(); } diff --git a/tests/udp_client.cc b/tests/udp_client.cc index c69590072a..c7fd99258b 100644 --- a/tests/udp_client.cc +++ b/tests/udp_client.cc @@ -19,7 +19,7 @@ public: void start(ipv4_addr server_addr) { std::cout << "Sending to " << server_addr << std::endl; - _chan = engine.net().make_udp_channel(); + _chan = engine().net().make_udp_channel(); _stats_timer.set_callback([this] { std::cout << "Out: " << n_sent << " pps, \t"; diff --git a/tests/udp_server.cc b/tests/udp_server.cc index 05a194ea8c..4458fca684 100644 --- a/tests/udp_server.cc +++ b/tests/udp_server.cc @@ -17,7 +17,7 @@ private: public: void start(uint16_t port) { ipv4_addr listen_addr{port}; - _chan = engine.net().make_udp_channel(listen_addr); + _chan = engine().net().make_udp_channel(listen_addr); _stats_timer.set_callback([this] { std::cout << "Out: " << _n_sent << " pps" << std::endl; diff --git a/tests/udp_zero_copy.cc b/tests/udp_zero_copy.cc index 4e12443853..70c1027caa 100644 --- a/tests/udp_zero_copy.cc +++ b/tests/udp_zero_copy.cc @@ -52,7 +52,7 @@ public: } void start(int chunk_size, bool copy, size_t mem_size) { ipv4_addr listen_addr{10000}; - _chan = engine.net().make_udp_channel(listen_addr); + _chan = engine().net().make_udp_channel(listen_addr); std::cout << "Listening on " << listen_addr << std::endl; From 5454c79613054ee87cfd9716bc1dc1654d6f7b8b Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 27 Jan 2015 14:03:49 +0200 Subject: [PATCH 8/8] core: allocate reactors on each cpu instead of using thread_local variable I see TLS init function for engine high in cache miss profile. And yes, this patch has #define. --- core/reactor.cc | 19 +++++++++++++++++-- core/reactor.hh | 5 +++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/core/reactor.cc b/core/reactor.cc index ffe7f12dd8..8014d227c4 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -1253,6 +1253,19 @@ void smp::pin(unsigned cpu_id) { } #endif +void smp::allocate_reactor() { + static thread_local std::unique_ptr reactor_holder; + + assert(!reactor_holder); + + // we cannot just write "local_engin = new reactor" since reactor's constructor + // uses local_engine + auto buf = new (with_alignment(64)) char[sizeof(reactor)]; + local_engine = reinterpret_cast(buf); + new (buf) reactor; + reactor_holder.reset(local_engine); +} + void smp::configure(boost::program_options::variables_map configuration) { smp::count = 1; @@ -1300,6 +1313,7 @@ void smp::configure(boost::program_options::variables_map configuration) sigfillset(&mask); auto r = ::sigprocmask(SIG_BLOCK, &mask, NULL); throw_system_error_on(r == -1); + allocate_reactor(); engine()._id = i; start_all_queues(); inited.wait(); @@ -1308,6 +1322,8 @@ void smp::configure(boost::program_options::variables_map configuration) }); } + allocate_reactor(); + #ifdef HAVE_DPDK auto it = _threads.begin(); RTE_LCORE_FOREACH_SLAVE(i) { @@ -1324,8 +1340,7 @@ void smp::configure(boost::program_options::variables_map configuration) __thread size_t future_avail_count = 0; __thread size_t task_quota = 0; -thread_local reactor local_engine; - +__thread reactor* local_engine; class reactor_notifier_epoll : public reactor_notifier { writeable_eventfd _write; diff --git a/core/reactor.hh b/core/reactor.hh index 0bde67064c..0501c77802 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -811,11 +811,11 @@ reactor::make_pollfn(Func&& func) { return std::make_unique(std::forward(func)); } -extern thread_local reactor local_engine; +extern __thread reactor* local_engine; extern __thread size_t task_quota; inline reactor& engine() { - return local_engine; + return *local_engine; } class smp { @@ -879,6 +879,7 @@ public: private: static void start_all_queues(); static void pin(unsigned cpu_id); + static void allocate_reactor(); public: static unsigned count; };