diff --git a/apps/httpd/httpd.cc b/apps/httpd/httpd.cc index 0d9b06782d..a6ee4cb6e8 100644 --- a/apps/httpd/httpd.cc +++ b/apps/httpd/httpd.cc @@ -39,7 +39,7 @@ public: future<> listen(ipv4_addr addr) { listen_options lo; lo.reuse_address = true; - _listeners.push_back(engine.listen(make_ipv4_address(addr), lo)); + _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); do_accepts(_listeners.size() - 1); return make_ready_future<>(); } diff --git a/apps/memcached/memcache.cc b/apps/memcached/memcache.cc index 1f147f8ed5..d7229f4175 100644 --- a/apps/memcached/memcache.cc +++ b/apps/memcached/memcache.cc @@ -1101,7 +1101,7 @@ public: } ss << histo[i] << "\n"; } - return {engine.cpu_id(), make_foreign(make_lw_shared(ss.str()))}; + return {engine().cpu_id(), make_foreign(make_lw_shared(ss.str()))}; } future<> stop() { return make_ready_future<>(); } @@ -1130,7 +1130,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future set(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().set(insertion)); } return _peers.invoke_on(cpu, &cache::remote_set, std::ref(insertion)); @@ -1139,7 +1139,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future add(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().add(insertion)); } return _peers.invoke_on(cpu, &cache::remote_add, std::ref(insertion)); @@ -1148,7 +1148,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future replace(item_insertion_data& insertion) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().replace(insertion)); } return _peers.invoke_on(cpu, &cache::remote_replace, std::ref(insertion)); @@ -1169,7 +1169,7 @@ public: // The caller must keep @insertion live until the resulting future resolves. future cas(item_insertion_data& insertion, typename item::version_type version) { auto cpu = get_cpu(insertion.key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future(_peers.local().cas(insertion, version)); } return _peers.invoke_on(cpu, &cache::remote_cas, std::ref(insertion), std::move(version)); @@ -1182,7 +1182,7 @@ public: // The caller must keep @key live until the resulting future resolves. future, bool>> incr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future, bool>>( _peers.local().incr(key, delta)); } @@ -1192,7 +1192,7 @@ public: // The caller must keep @key live until the resulting future resolves. future, bool>> decr(item_key& key, uint64_t delta) { auto cpu = get_cpu(key); - if (engine.cpu_id() == cpu) { + if (engine().cpu_id() == cpu) { return make_ready_future, bool>>( _peers.local().decr(key, delta)); } @@ -1658,7 +1658,7 @@ public: } void start() { - _chan = engine.net().make_udp_channel({_port}); + _chan = engine().net().make_udp_channel({_port}); keep_doing([this] { return _chan.receive().then([this](udp_datagram dgram) { packet& p = dgram.get_data(); @@ -1734,7 +1734,7 @@ public: void start() { listen_options lo; lo.reuse_address = true; - _listener = engine.listen(make_ipv4_address({_port}), lo); + _listener = engine().listen(make_ipv4_address({_port}), lo); keep_doing([this] { return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable { auto conn = make_lw_shared(std::move(fd), addr, _cache, _system_stats); @@ -1811,10 +1811,10 @@ int start_instance(int ac, char** av) { ; return app.run(ac, av, [&] { - engine.at_exit([&] { return tcp_server.stop(); }); - engine.at_exit([&] { return udp_server.stop(); }); - engine.at_exit([&] { return cache_peers.stop(); }); - engine.at_exit([&] { return system_stats.stop(); }); + engine().at_exit([&] { return tcp_server.stop(); }); + engine().at_exit([&] { return udp_server.stop(); }); + engine().at_exit([&] { return cache_peers.stop(); }); + engine().at_exit([&] { return system_stats.stop(); }); auto&& config = app.configuration(); return cache_peers.start().then([&system_stats] { @@ -1822,7 +1822,7 @@ int start_instance(int ac, char** av) { }).then([&] { if (WithFlashCache) { auto device_path = config["device"].as(); - return engine.open_file_dma(device_path).then([&] (file f) { + return engine().open_file_dma(device_path).then([&] (file f) { auto dev = make_lw_shared({std::move(f)}); return dev->f().stat().then([&, dev] (struct stat st) mutable { assert(S_ISBLK(st.st_mode)); @@ -1849,7 +1849,7 @@ int start_instance(int ac, char** av) { }).then([&tcp_server] { return tcp_server.invoke_on_all(&memcache::tcp_server::start); }).then([&] { - if (engine.net().has_per_core_namespace()) { + if (engine().net().has_per_core_namespace()) { return udp_server.start(std::ref(cache), std::ref(system_stats)); } else { return udp_server.start_single(std::ref(cache), std::ref(system_stats)); diff --git a/core/app-template.cc b/core/app-template.cc index 1b09daa5b2..2669e71ac8 100644 --- a/core/app-template.cc +++ b/core/app-template.cc @@ -58,7 +58,7 @@ app_template::run(int ac, char ** av, std::function&& func) { } smp::configure(configuration); _configuration = {std::move(configuration)}; - engine.when_started().then([this] { + engine().when_started().then([this] { scollectd::configure( this->configuration()); }).then( std::move(func) @@ -67,10 +67,10 @@ app_template::run(int ac, char ** av, std::function&& func) { get_ex(); } catch (std::exception& ex) { std::cout << "program failed with uncaught exception: " << ex.what() << "\n"; - engine.exit(1); + engine().exit(1); } }); - return engine.run(); + return engine().run(); } diff --git a/core/distributed.hh b/core/distributed.hh index 15b1e22d61..7949d2f597 100644 --- a/core/distributed.hh +++ b/core/distributed.hh @@ -177,7 +177,7 @@ distributed::invoke_on_all(void (Service::*func)(Args...), Args... args template Service& distributed::local() { - return *_instances[engine.cpu_id()]; + return *_instances[engine().cpu_id()]; } // Smart pointer wrapper which makes it safe to move across CPUs. @@ -190,19 +190,19 @@ private: unsigned _cpu; private: bool on_origin() { - return engine.cpu_id() == _cpu; + return engine().cpu_id() == _cpu; } public: using element_type = typename PtrType::element_type; foreign_ptr() : _value(PtrType()) - , _cpu(engine.cpu_id()) { + , _cpu(engine().cpu_id()) { } foreign_ptr(std::nullptr_t) : foreign_ptr() {} foreign_ptr(PtrType value) : _value(std::move(value)) - , _cpu(engine.cpu_id()) { + , _cpu(engine().cpu_id()) { } // The type is intentionally non-copyable because copies // are expensive because each copy requires across-CPU call. diff --git a/core/reactor.cc b/core/reactor.cc index 8cefdac565..ffe7f12dd8 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -153,7 +153,7 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd, eevt.data.ptr = &pfd; int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt); assert(r == 0); - engine.start_epoll(); + engine().start_epoll(); } pfd.*pr = promise<>(); return (pfd.*pr).get_future(); @@ -273,7 +273,7 @@ bool reactor::process_io() future posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { - return engine.submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io([this, pos, buffer, len] (iocb& io) { io_prep_pwrite(&io, _fd, const_cast(buffer), len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -283,7 +283,7 @@ posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) { future posix_file_impl::write_dma(uint64_t pos, std::vector iov) { - return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_pwritev(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -293,7 +293,7 @@ posix_file_impl::write_dma(uint64_t pos, std::vector iov) { future posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { - return engine.submit_io([this, pos, buffer, len] (iocb& io) { + return engine().submit_io([this, pos, buffer, len] (iocb& io) { io_prep_pread(&io, _fd, buffer, len, pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -303,7 +303,7 @@ posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) { future posix_file_impl::read_dma(uint64_t pos, std::vector iov) { - return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) { + return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) { io_prep_preadv(&io, _fd, iov.data(), iov.size(), pos); }).then([] (io_event ev) { throw_kernel_error(long(ev.res)); @@ -333,7 +333,7 @@ reactor::open_directory(sstring name) { future<> posix_file_impl::flush(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { return wrap_syscall(::fsync(_fd)); }).then([] (syscall_result sr) { sr.throw_if_error(); @@ -343,7 +343,7 @@ posix_file_impl::flush(void) { future posix_file_impl::stat(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { struct stat st; auto ret = ::fstat(_fd, &st); return wrap_syscall(ret, st); @@ -355,7 +355,7 @@ posix_file_impl::stat(void) { future<> posix_file_impl::discard(uint64_t offset, uint64_t length) { - return engine._thread_pool.submit>([this, offset, length] () mutable { + return engine()._thread_pool.submit>([this, offset, length] () mutable { return wrap_syscall(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, offset, length)); }).then([] (syscall_result sr) { @@ -366,7 +366,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) { future<> blockdev_file_impl::discard(uint64_t offset, uint64_t length) { - return engine._thread_pool.submit>([this, offset, length] () mutable { + return engine()._thread_pool.submit>([this, offset, length] () mutable { uint64_t range[2] { offset, length }; return wrap_syscall(::ioctl(_fd, BLKDISCARD, &range)); }).then([] (syscall_result sr) { @@ -384,7 +384,7 @@ posix_file_impl::size(void) { future blockdev_file_impl::size(void) { - return engine._thread_pool.submit>([this] { + return engine()._thread_pool.submit>([this] { size_t size; int ret = ::ioctl(_fd, BLKGETSIZE64, &size); return wrap_syscall(ret, size); @@ -431,7 +431,7 @@ posix_file_impl::list_directory(std::function (directory_entry de)> nex auto eofcond = [w] { return w->eof; }; return do_until(eofcond, [w, this] { if (w->current == w->total) { - return engine._thread_pool.submit>([w , this] () { + return engine()._thread_pool.submit>([w , this] () { auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast(w->buffer), sizeof(w->buffer)); return wrap_syscall(ret); }).then([w] (syscall_result ret) { @@ -524,13 +524,13 @@ future<> reactor::run_exit_tasks() { } void reactor::stop() { - assert(engine._id == 0); + assert(engine()._id == 0); run_exit_tasks().then([this] { auto sem = new semaphore(0); for (unsigned i = 1; i < smp::count; i++) { smp::submit_to<>(i, []() { - return engine.run_exit_tasks().then([] { - engine._stopped = true; + return engine().run_exit_tasks().then([] { + engine()._stopped = true; }); }).then([sem, i]() { sem->signal(); @@ -558,7 +558,7 @@ reactor::receive_signal(int signo) { } void sigaction(int signo, siginfo_t* siginfo, void* ignore) { - engine._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); + engine()._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed); } bool reactor::poll_signal() { @@ -691,7 +691,7 @@ int reactor::run() { _network_stack = std::move(stack); for (unsigned c = 0; c < smp::count; c++) { smp::submit_to(c, [] { - engine._cpu_started.signal(); + engine()._cpu_started.signal(); }); } }); @@ -804,7 +804,7 @@ public: explicit registration_task(poller* p) : _p(p) {} virtual void run() noexcept override { if (_p) { - engine.register_poller(_p->_pollfn.get()); + engine().register_poller(_p->_pollfn.get()); _p->_registration_task = nullptr; } } @@ -822,7 +822,7 @@ private: public: explicit deregistration_task(std::unique_ptr&& p) : _p(std::move(p)) {} virtual void run() noexcept override { - engine.unregister_poller(_p.get()); + engine().unregister_poller(_p.get()); } }; @@ -862,7 +862,7 @@ reactor::poller::do_register() { // the poller instead. auto task = std::make_unique(this); auto tmp = task.get(); - engine.add_task(std::move(task)); + engine().add_task(std::move(task)); _registration_task = tmp; } @@ -883,8 +883,8 @@ reactor::poller::~poller() { auto dummy = make_pollfn([] { return false; }); auto dummy_p = dummy.get(); auto task = std::make_unique(std::move(dummy)); - engine.add_task(std::move(task)); - engine.replace_poller(_pollfn.get(), dummy_p); + engine().add_task(std::move(task)); + engine().replace_poller(_pollfn.get(), dummy_p); } } } @@ -965,7 +965,7 @@ void smp_message_queue::submit_item(smp_message_queue::work_item* item) { void smp_message_queue::respond(work_item* item) { _completed_fifo.push_back(item); - if (_completed_fifo.size() >= batch_size || engine._stopped) { + if (_completed_fifo.size() >= batch_size || engine()._stopped) { flush_response_batch(); } } @@ -1027,7 +1027,7 @@ size_t smp_message_queue::process_incoming() { void smp_message_queue::start(unsigned cpuid) { _tx.init(); char instance[10]; - std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid); + std::snprintf(instance, sizeof(instance), "%u-%u", engine().cpu_id(), cpuid); _collectd_regs = { // queue_length value:GAUGE:0:U // Absolute value of num packets in last tx batch. @@ -1076,7 +1076,7 @@ void smp_message_queue::start(unsigned cpuid) { #ifndef HAVE_OSV thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) { keep_doing([this] { - return engine.receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); + return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); }); }); } @@ -1132,7 +1132,7 @@ file_desc readable_eventfd::try_create_eventfd(size_t initial) { } future readable_eventfd::wait() { - return engine.readable(*_fd._s).then([this] { + return engine().readable(*_fd._s).then([this] { uint64_t count; int r = ::read(_fd.get_fd(), &count, sizeof(count)); assert(r == sizeof(count)); @@ -1141,7 +1141,7 @@ future readable_eventfd::wait() { } void schedule(std::unique_ptr t) { - engine.add_task(std::move(t)); + engine().add_task(std::move(t)); } bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) { @@ -1219,8 +1219,8 @@ unsigned smp::count = 1; void smp::start_all_queues() { for (unsigned c = 0; c < count; c++) { - if (c != engine.cpu_id()) { - _qs[c][engine.cpu_id()].start(c); + if (c != engine().cpu_id()) { + _qs[c][engine().cpu_id()].start(c); } } } @@ -1300,11 +1300,11 @@ void smp::configure(boost::program_options::variables_map configuration) sigfillset(&mask); auto r = ::sigprocmask(SIG_BLOCK, &mask, NULL); throw_system_error_on(r == -1); - engine._id = i; + engine()._id = i; start_all_queues(); inited.wait(); - engine.configure(configuration); - engine.run(); + engine().configure(configuration); + engine().run(); }); } @@ -1317,14 +1317,14 @@ void smp::configure(boost::program_options::variables_map configuration) start_all_queues(); inited.wait(); - engine.configure(configuration); - engine._lowres_clock = std::make_unique(); + engine().configure(configuration); + engine()._lowres_clock = std::make_unique(); } __thread size_t future_avail_count = 0; __thread size_t task_quota = 0; -thread_local reactor engine; +thread_local reactor local_engine; class reactor_notifier_epoll : public reactor_notifier { @@ -1361,7 +1361,7 @@ class reactor_notifier_osv : bool _needed = false; public: virtual future<> wait() override { - return engine.notified(this); + return engine().notified(this); } virtual void signal() override { wake(); diff --git a/core/reactor.hh b/core/reactor.hh index ad181fb7c4..0bde67064c 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -811,9 +811,13 @@ reactor::make_pollfn(Func&& func) { return std::make_unique(std::forward(func)); } -extern thread_local reactor engine; +extern thread_local reactor local_engine; extern __thread size_t task_quota; +inline reactor& engine() { + return local_engine; +} + class smp { #if HAVE_DPDK using thread_adaptor = std::function; @@ -837,10 +841,10 @@ public: template static std::result_of_t submit_to(unsigned t, Func func, std::enable_if_t::value, void*> = nullptr) { - if (t == engine.cpu_id()) { + if (t == engine().cpu_id()) { return func(); } else { - return _qs[t][engine.cpu_id()].submit(std::move(func)); + return _qs[t][engine().cpu_id()].submit(std::move(func)); } } template @@ -861,11 +865,11 @@ public: static bool poll_queues() { size_t got = 0; for (unsigned i = 0; i < count; i++) { - if (engine.cpu_id() != i) { - auto& rxq = _qs[engine.cpu_id()][i]; + if (engine().cpu_id() != i) { + auto& rxq = _qs[engine().cpu_id()][i]; rxq.flush_response_batch(); got += rxq.process_incoming(); - auto& txq = _qs[i][engine._id]; + auto& txq = _qs[i][engine()._id]; txq.flush_request_batch(); got += txq.process_completions(); } @@ -881,7 +885,7 @@ public: inline pollable_fd_state::~pollable_fd_state() { - engine.forget(*this); + engine().forget(*this); } class data_source_impl { @@ -1315,32 +1319,32 @@ output_stream::flush() { inline future pollable_fd::read_some(char* buffer, size_t size) { - return engine.read_some(*_s, buffer, size); + return engine().read_some(*_s, buffer, size); } inline future pollable_fd::read_some(uint8_t* buffer, size_t size) { - return engine.read_some(*_s, buffer, size); + return engine().read_some(*_s, buffer, size); } inline future pollable_fd::read_some(const std::vector& iov) { - return engine.read_some(*_s, iov); + return engine().read_some(*_s, iov); } inline future<> pollable_fd::write_all(const char* buffer, size_t size) { - return engine.write_all(*_s, buffer, size); + return engine().write_all(*_s, buffer, size); } inline future<> pollable_fd::write_all(const uint8_t* buffer, size_t size) { - return engine.write_all(*_s, buffer, size); + return engine().write_all(*_s, buffer, size); } inline future pollable_fd::write_some(net::packet& p) { - return engine.writeable(*_s).then([this, &p] () mutable { + return engine().writeable(*_s).then([this, &p] () mutable { static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) && sizeof(iovec::iov_base) == sizeof(net::fragment::base) && offsetof(iovec, iov_len) == offsetof(net::fragment, size) && @@ -1374,22 +1378,22 @@ future<> pollable_fd::write_all(net::packet& p) { inline future<> pollable_fd::readable() { - return engine.readable(*_s); + return engine().readable(*_s); } inline future<> pollable_fd::writeable() { - return engine.writeable(*_s); + return engine().writeable(*_s); } inline future pollable_fd::accept() { - return engine.accept(*_s); + return engine().accept(*_s); } inline future pollable_fd::recvmsg(struct msghdr *msg) { - return engine.readable(*_s).then([this, msg] { + return engine().readable(*_s).then([this, msg] { auto r = get_file_desc().recvmsg(msg, 0); if (!r) { return recvmsg(msg); @@ -1408,7 +1412,7 @@ future pollable_fd::recvmsg(struct msghdr *msg) { inline future pollable_fd::sendmsg(struct msghdr* msg) { - return engine.writeable(*_s).then([this, msg] () mutable { + return engine().writeable(*_s).then([this, msg] () mutable { auto r = get_file_desc().sendmsg(msg, 0); if (!r) { return sendmsg(msg); @@ -1425,7 +1429,7 @@ future pollable_fd::sendmsg(struct msghdr* msg) { inline future pollable_fd::sendto(socket_address addr, const void* buf, size_t len) { - return engine.writeable(*_s).then([this, buf, len, addr] () mutable { + return engine().writeable(*_s).then([this, buf, len, addr] () mutable { auto r = get_file_desc().sendto(addr, buf, len, 0); if (!r) { return sendto(std::move(addr), buf, len); @@ -1442,7 +1446,7 @@ template inline timer::~timer() { if (_queued) { - engine.del_timer(this); + engine().del_timer(this); } } @@ -1460,7 +1464,7 @@ void timer::arm(time_point until, boost::optional period) { _armed = true; _expired = false; _expiry = until; - engine.add_timer(this); + engine().add_timer(this); _queued = true; } @@ -1493,7 +1497,7 @@ bool timer::cancel() { } _armed = false; if (_queued) { - engine.del_timer(this); + engine().del_timer(this); _queued = false; } return true; diff --git a/core/scollectd.cc b/core/scollectd.cc index 4fb89e5428..c14519d551 100644 --- a/core/scollectd.cc +++ b/core/scollectd.cc @@ -93,7 +93,7 @@ public: _period = period; _addr = addr; _host = host; - _chan = engine.net().make_udp_channel(); + _chan = engine().net().make_udp_channel(); _timer.set_callback(std::bind(&impl::run, this)); // dogfood ourselves @@ -285,7 +285,7 @@ private: // Optional put_cached(part_type::PluginInst, id.plugin_instance() == per_cpu_plugin_instance ? - std::to_string(engine.cpu_id()) : id.plugin_instance()); + std::to_string(engine().cpu_id()) : id.plugin_instance()); put_cached(part_type::Type, id.type()); // Optional put_cached(part_type::TypeInst, id.type_instance()); diff --git a/core/xen/evtchn.cc b/core/xen/evtchn.cc index d696120d35..1f66ea375d 100644 --- a/core/xen/evtchn.cc +++ b/core/xen/evtchn.cc @@ -136,7 +136,7 @@ class kernel_evtchn: public evtchn { public: kernel_evtchn(unsigned otherend) : evtchn(otherend) - , _notified(engine.make_reactor_notifier()) {} + , _notified(engine().make_reactor_notifier()) {} virtual port bind() override; virtual void notify(int port) override; }; diff --git a/net/dhcp.cc b/net/dhcp.cc index 255cf80669..8933a43702 100644 --- a/net/dhcp.cc +++ b/net/dhcp.cc @@ -316,7 +316,7 @@ public: return make_ready_future<>(); } handled = true; - auto src_cpu = engine.cpu_id(); + auto src_cpu = engine().cpu_id(); if (src_cpu == 0) { return process_packet(std::move(p), dhp, opt_off); } diff --git a/net/dpdk.cc b/net/dpdk.cc index 2a946bdce8..8e39b88aeb 100644 --- a/net/dpdk.cc +++ b/net/dpdk.cc @@ -136,7 +136,7 @@ public: dpdk_device(uint8_t port_idx, uint16_t num_queues) : _port_idx(port_idx) , _num_queues(num_queues) - , _home_cpu(engine.cpu_id()) { + , _home_cpu(engine().cpu_id()) { /* now initialise the port we will use */ int ret = init_port_start(); diff --git a/net/ip.cc b/net/ip.cc index e9f3094822..d8e2783013 100644 --- a/net/ip.cc +++ b/net/ip.cc @@ -151,7 +151,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) { auto dropped_size = frag.mem_size; auto& ip_data = frag.data.map.begin()->second; // Choose a cpu to forward this packet - auto cpu_id = engine.cpu_id(); + auto cpu_id = engine().cpu_id(); auto l4 = _l4[h.ip_proto]; if (l4) { size_t l4_offset = 0; @@ -163,7 +163,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) { } // No need to forward if the dst cpu is the current cpu - if (cpu_id == engine.cpu_id()) { + if (cpu_id == engine().cpu_id()) { l4->received(std::move(ip_data), h.src_ip, h.dst_ip); } else { auto to = _netif->hw_address(); diff --git a/net/native-stack.cc b/net/native-stack.cc index 1bd542678b..2ffdd8f24a 100644 --- a/net/native-stack.cc +++ b/net/native-stack.cc @@ -77,7 +77,7 @@ void create_native_net_device(boost::program_options::variables_map opts) { std::shared_ptr sdev(dev.release()); for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [opts, sdev] { - uint16_t qid = engine.cpu_id(); + uint16_t qid = engine().cpu_id(); if (qid < sdev->hw_queues_count()) { auto qp = sdev->init_local_queue(opts, qid); std::map cpu_weights; @@ -130,7 +130,7 @@ public: virtual udp_channel make_udp_channel(ipv4_addr addr) override; virtual future<> initialize() override; static future> create(boost::program_options::variables_map opts) { - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { create_native_net_device(opts); } return ready_promise.get_future(); @@ -199,7 +199,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) { // Hijack the ip-stack. for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [d] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.set_ipv4_packet_filter(d->get_ipv4_filter()); }); } @@ -209,7 +209,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) { return fut.then([this, d, is_renew](bool success, const dhcp::lease & res) { for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.set_ipv4_packet_filter(nullptr); }); } @@ -228,12 +228,12 @@ void native_network_stack::on_dhcp(bool success, const dhcp::lease & res, bool i _config.set_value(); } - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { // And the other cpus, which, in the case of initial discovery, // will be waiting for us. for (unsigned i = 1; i < smp::count; i++) { smp::submit_to(i, [success, res, is_renew]() { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.on_dhcp(success, res, is_renew); }); } @@ -259,7 +259,7 @@ future<> native_network_stack::initialize() { // Only run actual discover on main cpu. // All other cpus must simply for main thread to complete and signal them. - if (engine.cpu_id() == 0) { + if (engine().cpu_id() == 0) { run_dhcp(); } return _config.get_future(); @@ -270,7 +270,7 @@ void arp_learn(ethernet_address l2, ipv4_address l3) { for (unsigned i = 0; i < smp::count; i++) { smp::submit_to(i, [l2, l3] { - auto & ns = static_cast(engine.net()); + auto & ns = static_cast(engine().net()); ns.arp_learn(l2, l3); }); } diff --git a/net/net.cc b/net/net.cc index 3de9e7d7da..d266a4bb3a 100644 --- a/net/net.cc +++ b/net/net.cc @@ -77,7 +77,7 @@ qp::~qp() { void qp::configure_proxies(const std::map& cpu_weights) { assert(!cpu_weights.empty()); - if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) { + if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine().cpu_id())) { // special case queue sending to self only, to avoid requiring a hash value return; } @@ -113,15 +113,15 @@ void qp::build_sw_reta(const std::map& cpu_weights) { subscription device::receive(std::function (packet)> next_packet) { - auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet)); - _queues[engine.cpu_id()]->rx_start(); + auto sub = _queues[engine().cpu_id()]->_rx_stream.listen(std::move(next_packet)); + _queues[engine().cpu_id()]->rx_start(); return std::move(sub); } void device::set_local_queue(std::unique_ptr dev) { - assert(!_queues[engine.cpu_id()]); - _queues[engine.cpu_id()] = dev.get(); - engine.at_destroy([dev = std::move(dev)] {}); + assert(!_queues[engine().cpu_id()]); + _queues[engine().cpu_id()] = dev.get(); + engine().at_destroy([dev = std::move(dev)] {}); } @@ -181,7 +181,7 @@ void interface::forward(unsigned cpuid, packet p) { if (queue_depth < 1000) { queue_depth++; - auto src_cpu = engine.cpu_id(); + auto src_cpu = engine().cpu_id(); smp::submit_to(cpuid, [this, p = std::move(p), src_cpu]() mutable { _dev->l2receive(p.free_on_cpu(src_cpu)); }).then([] { @@ -196,7 +196,7 @@ future<> interface::dispatch_packet(packet p) { auto i = _proto_map.find(ntoh(eh->eth_proto)); if (i != _proto_map.end()) { l3_rx_stream& l3 = i->second; - auto fw = _dev->forward_dst(engine.cpu_id(), [&p, &l3] () { + auto fw = _dev->forward_dst(engine().cpu_id(), [&p, &l3] () { auto hwrss = p.rss_hash(); if (hwrss) { return hwrss.value(); @@ -208,7 +208,7 @@ future<> interface::dispatch_packet(packet p) { return 0u; } }); - if (fw != engine.cpu_id()) { + if (fw != engine().cpu_id()) { forward(fw, std::move(p)); } else { auto h = ntoh(*eh); diff --git a/net/net.hh b/net/net.hh index 401c166361..7016c3b49a 100644 --- a/net/net.hh +++ b/net/net.hh @@ -169,8 +169,8 @@ public: } virtual ~device() {}; qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; } - qp& local_queue() { return queue_for_cpu(engine.cpu_id()); } - void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); } + qp& local_queue() { return queue_for_cpu(engine().cpu_id()); } + void l2receive(packet p) { _queues[engine().cpu_id()]->_rx_stream.produce(std::move(p)); } subscription receive(std::function (packet)> next_packet); virtual ethernet_address hw_address() = 0; virtual net::hw_features hw_features() = 0; diff --git a/net/posix-stack.cc b/net/posix-stack.cc index 7e565000c2..974df87d20 100644 --- a/net/posix-stack.cc +++ b/net/posix-stack.cc @@ -29,7 +29,7 @@ posix_server_socket_impl::accept() { static unsigned balance = 0; auto cpu = balance++ % smp::count; - if (cpu == engine.cpu_id()) { + if (cpu == engine().cpu_id()) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future( connected_socket(std::move(csi)), sa); @@ -126,14 +126,14 @@ posix_data_sink_impl::put(packet p) { server_socket posix_network_stack::listen(socket_address sa, listen_options opt) { if (_reuseport) - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); else - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); } future posix_network_stack::connect(socket_address sa) { - return engine.posix_connect(sa).then([] (pollable_fd fd) { + return engine().posix_connect(sa).then([] (pollable_fd fd) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future(connected_socket(std::move(csi))); }); @@ -145,14 +145,14 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl: server_socket posix_ap_network_stack::listen(socket_address sa, listen_options opt) { if (_reuseport) - return server_socket(std::make_unique(sa, engine.posix_listen(sa, opt))); + return server_socket(std::make_unique(sa, engine().posix_listen(sa, opt))); else return server_socket(std::make_unique(sa)); } future posix_ap_network_stack::connect(socket_address sa) { - return engine.posix_connect(sa).then([] (pollable_fd fd) { + return engine().posix_connect(sa).then([] (pollable_fd fd) { std::unique_ptr csi(new posix_connected_socket_impl(std::move(fd))); return make_ready_future(connected_socket(std::move(csi))); }); @@ -221,7 +221,7 @@ public: auto sa = make_ipv4_address(bind_address); file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); fd.setsockopt(SOL_IP, IP_PKTINFO, true); - if (engine.posix_reuseport_available()) { + if (engine().posix_reuseport_available()) { fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1); } fd.bind(sa.u.sa, sizeof(sa.u.sas)); diff --git a/net/posix-stack.hh b/net/posix-stack.hh index 01439d3c81..eb761eb3e2 100644 --- a/net/posix-stack.hh +++ b/net/posix-stack.hh @@ -71,7 +71,7 @@ class posix_network_stack : public network_stack { private: const bool _reuseport; public: - explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {} + explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine().posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; virtual net::udp_channel make_udp_channel(ipv4_addr addr) override; @@ -85,7 +85,7 @@ class posix_ap_network_stack : public posix_network_stack { private: const bool _reuseport; public: - posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {} + posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine().posix_reuseport_available()) {} virtual server_socket listen(socket_address sa, listen_options opts) override; virtual future connect(socket_address sa) override; static future> create(boost::program_options::variables_map opts) { diff --git a/net/proxy.cc b/net/proxy.cc index 22a025dfcc..b5171e606f 100644 --- a/net/proxy.cc +++ b/net/proxy.cc @@ -39,7 +39,7 @@ uint32_t proxy_net_device::send(circular_buffer& p) if (!_moving.empty()) { qp* dev = &_dev->queue_for_cpu(_cpu); - auto cpu = engine.cpu_id(); + auto cpu = engine().cpu_id(); smp::submit_to(_cpu, [this, dev, cpu]() mutable { for(size_t i = 0; i < _moving.size(); i++) { dev->proxy_send(_moving[i].free_on_cpu(cpu, [this] { _send_depth--; })); diff --git a/net/tcp.hh b/net/tcp.hh index 7be5520901..49e7f828bd 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -469,7 +469,7 @@ future::connection> tcp::connect(socket_add do { src_port = _port_dist(_e); id = connid{src_ip, dst_ip, src_port, dst_port}; - } while (_inet._inet.netif()->hash2cpu(id.hash()) != engine.cpu_id() + } while (_inet._inet.netif()->hash2cpu(id.hash()) != engine().cpu_id() || _tcbs.find(id) != _tcbs.end()); auto tcbp = make_lw_shared(*this, id); diff --git a/net/virtio.cc b/net/virtio.cc index a0b8fc7934..c86c89c2fa 100644 --- a/net/virtio.cc +++ b/net/virtio.cc @@ -912,7 +912,7 @@ qp_osv::qp_osv(osv::assigned_virtio &virtio, // Set up interrupts // FIXME: in OSv, the first thing we do in the handler is to call // _rqx.disable_interrupts(). Here in seastar, we only do it much later - // in the main engine. Probably needs to do it like in osv - in the beginning of the handler. + // in the main engine(). Probably needs to do it like in osv - in the beginning of the handler. _virtio.enable_interrupt( 0, [&] { _rxq.wake_notifier_wait(); } ); _virtio.enable_interrupt( diff --git a/tests/blkdiscard_test.cc b/tests/blkdiscard_test.cc index b5702fc649..92cadff242 100644 --- a/tests/blkdiscard_test.cc +++ b/tests/blkdiscard_test.cc @@ -25,7 +25,7 @@ int main(int ac, char** av) { auto&& config = app.configuration(); auto filepath = config["dev"].as(); - engine.open_file_dma(filepath).then([] (file f) { + engine().open_file_dma(filepath).then([] (file f) { auto ft = new file_test{std::move(f)}; ft->f.stat().then([ft] (struct stat st) mutable { @@ -42,7 +42,7 @@ int main(int ac, char** av) { }).then([ft] () mutable { std::cout << "done\n"; delete ft; - engine.exit(0); + engine().exit(0); }); }); }); diff --git a/tests/directory_test.cc b/tests/directory_test.cc index c87edeaa10..1f6e9129d7 100644 --- a/tests/directory_test.cc +++ b/tests/directory_test.cc @@ -25,11 +25,11 @@ int main(int ac, char** av) { } }; return app_template().run(ac, av, [] { - return engine.open_directory(".").then([] (file f) { + return engine().open_directory(".").then([] (file f) { auto l = make_lw_shared(std::move(f)); return l->done().then([l] { // ugly thing to keep *l alive - engine.exit(0); + engine().exit(0); }); }); }); diff --git a/tests/echotest.cc b/tests/echotest.cc index bc3f80c3f4..d1cbbc3480 100644 --- a/tests/echotest.cc +++ b/tests/echotest.cc @@ -101,7 +101,7 @@ int main(int ac, char** av) { dnet->receive([vnet, &rx] (packet p) { return echo_packet(*vnet, std::move(p)); }); - engine.run(); + engine().run(); return 0; } diff --git a/tests/fileiotest.cc b/tests/fileiotest.cc index baf3ca5800..ea59193417 100644 --- a/tests/fileiotest.cc +++ b/tests/fileiotest.cc @@ -14,7 +14,7 @@ struct file_test { int main(int ac, char** av) { static constexpr auto max = 10000; - engine.open_file_dma("testfile.tmp").then([] (file f) { + engine().open_file_dma("testfile.tmp").then([] (file f) { auto ft = new file_test{std::move(f)}; for (size_t i = 0; i < max; ++i) { ft->par.wait().then([ft, i] { @@ -45,7 +45,7 @@ int main(int ac, char** av) { ::exit(0); }); }); - engine.run(); + engine().run(); return 0; } diff --git a/tests/ip_test.cc b/tests/ip_test.cc index 53a8308acd..0bc4cf29e2 100644 --- a/tests/ip_test.cc +++ b/tests/ip_test.cc @@ -20,7 +20,7 @@ int main(int ac, char** av) { interface netif(std::move(vnet)); ipv4 inet(&netif); inet.set_host_address(ipv4_address("192.168.122.2")); - engine.run(); + engine().run(); return 0; } diff --git a/tests/l3_test.cc b/tests/l3_test.cc index 7dd989d7cd..21ced25c51 100644 --- a/tests/l3_test.cc +++ b/tests/l3_test.cc @@ -23,7 +23,7 @@ int main(int ac, char** av) { interface netif(std::move(vnet)); l3_protocol arp(&netif, eth_protocol_num::arp, []{ return std::experimental::optional(); }); dump_arp_packets(arp); - engine.run(); + engine().run(); return 0; } diff --git a/tests/smp_test.cc b/tests/smp_test.cc index 6234c23917..7734f63f58 100644 --- a/tests/smp_test.cc +++ b/tests/smp_test.cc @@ -55,7 +55,7 @@ int main(int ac, char** av) { return report("smp exception", test_smp_exception()); }).then([] { print("\n%d tests / %d failures\n", tests, fails); - engine.exit(fails ? 1 : 0); + engine().exit(fails ? 1 : 0); }); }); } diff --git a/tests/tcp_client.cc b/tests/tcp_client.cc index 0df1f93723..b55d6e077b 100644 --- a/tests/tcp_client.cc +++ b/tests/tcp_client.cc @@ -63,7 +63,7 @@ public: }; void start(ipv4_addr server_addr, std::string mode) { - engine.net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) { + engine().net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) { _sockets.push_back(std::move(fd)); auto conn = new connection(std::move(_sockets[0])); if (mode == "write") { diff --git a/tests/tcp_server.cc b/tests/tcp_server.cc index f226061cc6..92e279d7f6 100644 --- a/tests/tcp_server.cc +++ b/tests/tcp_server.cc @@ -26,7 +26,7 @@ public: future<> listen(ipv4_addr addr) { listen_options lo; lo.reuse_address = true; - _listeners.push_back(engine.listen(make_ipv4_address(addr), lo)); + _listeners.push_back(engine().listen(make_ipv4_address(addr), lo)); do_accepts(_listeners.size() - 1); return make_ready_future<>(); } diff --git a/tests/tcp_test.cc b/tests/tcp_test.cc index 2145f1a277..6f0ff968d4 100644 --- a/tests/tcp_test.cc +++ b/tests/tcp_test.cc @@ -46,8 +46,8 @@ int main(int ac, char** av) { ipv4 inet(&netif); inet.set_host_address(ipv4_address("192.168.122.2")); tcp_test tt(inet); - engine.when_started().then([&tt] { tt.run(); }); - engine.run(); + engine().when_started().then([&tt] { tt.run(); }); + engine().run(); } diff --git a/tests/test-reactor.cc b/tests/test-reactor.cc index 328aa9a992..2b4a280263 100644 --- a/tests/test-reactor.cc +++ b/tests/test-reactor.cc @@ -41,9 +41,9 @@ int main(int ac, char** av) ipv4_addr addr{10000}; listen_options lo; lo.reuse_address = true; - test t(engine.posix_listen(make_ipv4_address(addr), lo)); + test t(engine().posix_listen(make_ipv4_address(addr), lo)); t.start_accept(); - engine.run(); + engine().run(); return 0; } diff --git a/tests/test-utils.hh b/tests/test-utils.hh index e20343b5ea..415c505114 100644 --- a/tests/test-utils.hh +++ b/tests/test-utils.hh @@ -33,18 +33,18 @@ public: boost::program_options::variables_map configuration; auto opts = reactor::get_options_description(); bpo::store(bpo::command_line_parser(0, nullptr).options(opts).run(), configuration); - engine.configure(configuration); - engine.when_started().then([this] { + engine().configure(configuration); + engine().when_started().then([this] { return run_test_case(); }).rescue([] (auto get) { try { get(); - engine.exit(0); + engine().exit(0); } catch (...) { std::terminate(); } }); - engine.run(); + engine().run(); }); t.join(); } diff --git a/tests/udp_client.cc b/tests/udp_client.cc index c69590072a..c7fd99258b 100644 --- a/tests/udp_client.cc +++ b/tests/udp_client.cc @@ -19,7 +19,7 @@ public: void start(ipv4_addr server_addr) { std::cout << "Sending to " << server_addr << std::endl; - _chan = engine.net().make_udp_channel(); + _chan = engine().net().make_udp_channel(); _stats_timer.set_callback([this] { std::cout << "Out: " << n_sent << " pps, \t"; diff --git a/tests/udp_server.cc b/tests/udp_server.cc index 05a194ea8c..4458fca684 100644 --- a/tests/udp_server.cc +++ b/tests/udp_server.cc @@ -17,7 +17,7 @@ private: public: void start(uint16_t port) { ipv4_addr listen_addr{port}; - _chan = engine.net().make_udp_channel(listen_addr); + _chan = engine().net().make_udp_channel(listen_addr); _stats_timer.set_callback([this] { std::cout << "Out: " << _n_sent << " pps" << std::endl; diff --git a/tests/udp_zero_copy.cc b/tests/udp_zero_copy.cc index 4e12443853..70c1027caa 100644 --- a/tests/udp_zero_copy.cc +++ b/tests/udp_zero_copy.cc @@ -52,7 +52,7 @@ public: } void start(int chunk_size, bool copy, size_t mem_size) { ipv4_addr listen_addr{10000}; - _chan = engine.net().make_udp_channel(listen_addr); + _chan = engine().net().make_udp_channel(listen_addr); std::cout << "Listening on " << listen_addr << std::endl;