core: add local engine accessor function

Do not use thread local engine variable directly, but use accessor
instead.
This commit is contained in:
Gleb Natapov
2015-01-27 14:03:48 +02:00
committed by Avi Kivity
parent 18d212b04e
commit 7a92efe8d1
34 changed files with 145 additions and 141 deletions

View File

@@ -39,7 +39,7 @@ public:
future<> listen(ipv4_addr addr) {
listen_options lo;
lo.reuse_address = true;
_listeners.push_back(engine.listen(make_ipv4_address(addr), lo));
_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
do_accepts(_listeners.size() - 1);
return make_ready_future<>();
}

View File

@@ -1101,7 +1101,7 @@ public:
}
ss << histo[i] << "\n";
}
return {engine.cpu_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
return {engine().cpu_id(), make_foreign(make_lw_shared<std::string>(ss.str()))};
}
future<> stop() { return make_ready_future<>(); }
@@ -1130,7 +1130,7 @@ public:
// The caller must keep @insertion live until the resulting future resolves.
future<bool> set(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<bool>(_peers.local().set(insertion));
}
return _peers.invoke_on(cpu, &cache<WithFlashCache>::remote_set, std::ref(insertion));
@@ -1139,7 +1139,7 @@ public:
// The caller must keep @insertion live until the resulting future resolves.
future<bool> add(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<bool>(_peers.local().add(insertion));
}
return _peers.invoke_on(cpu, &cache<WithFlashCache>::remote_add, std::ref(insertion));
@@ -1148,7 +1148,7 @@ public:
// The caller must keep @insertion live until the resulting future resolves.
future<bool> replace(item_insertion_data& insertion) {
auto cpu = get_cpu(insertion.key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<bool>(_peers.local().replace(insertion));
}
return _peers.invoke_on(cpu, &cache<WithFlashCache>::remote_replace, std::ref(insertion));
@@ -1169,7 +1169,7 @@ public:
// The caller must keep @insertion live until the resulting future resolves.
future<cas_result> cas(item_insertion_data& insertion, typename item<WithFlashCache>::version_type version) {
auto cpu = get_cpu(insertion.key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<cas_result>(_peers.local().cas(insertion, version));
}
return _peers.invoke_on(cpu, &cache<WithFlashCache>::remote_cas, std::ref(insertion), std::move(version));
@@ -1182,7 +1182,7 @@ public:
// The caller must keep @key live until the resulting future resolves.
future<std::pair<item_ptr<WithFlashCache>, bool>> incr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<std::pair<item_ptr<WithFlashCache>, bool>>(
_peers.local().incr(key, delta));
}
@@ -1192,7 +1192,7 @@ public:
// The caller must keep @key live until the resulting future resolves.
future<std::pair<item_ptr<WithFlashCache>, bool>> decr(item_key& key, uint64_t delta) {
auto cpu = get_cpu(key);
if (engine.cpu_id() == cpu) {
if (engine().cpu_id() == cpu) {
return make_ready_future<std::pair<item_ptr<WithFlashCache>, bool>>(
_peers.local().decr(key, delta));
}
@@ -1658,7 +1658,7 @@ public:
}
void start() {
_chan = engine.net().make_udp_channel({_port});
_chan = engine().net().make_udp_channel({_port});
keep_doing([this] {
return _chan.receive().then([this](udp_datagram dgram) {
packet& p = dgram.get_data();
@@ -1734,7 +1734,7 @@ public:
void start() {
listen_options lo;
lo.reuse_address = true;
_listener = engine.listen(make_ipv4_address({_port}), lo);
_listener = engine().listen(make_ipv4_address({_port}), lo);
keep_doing([this] {
return _listener->accept().then([this] (connected_socket fd, socket_address addr) mutable {
auto conn = make_lw_shared<connection>(std::move(fd), addr, _cache, _system_stats);
@@ -1811,10 +1811,10 @@ int start_instance(int ac, char** av) {
;
return app.run(ac, av, [&] {
engine.at_exit([&] { return tcp_server.stop(); });
engine.at_exit([&] { return udp_server.stop(); });
engine.at_exit([&] { return cache_peers.stop(); });
engine.at_exit([&] { return system_stats.stop(); });
engine().at_exit([&] { return tcp_server.stop(); });
engine().at_exit([&] { return udp_server.stop(); });
engine().at_exit([&] { return cache_peers.stop(); });
engine().at_exit([&] { return system_stats.stop(); });
auto&& config = app.configuration();
return cache_peers.start().then([&system_stats] {
@@ -1822,7 +1822,7 @@ int start_instance(int ac, char** av) {
}).then([&] {
if (WithFlashCache) {
auto device_path = config["device"].as<std::string>();
return engine.open_file_dma(device_path).then([&] (file f) {
return engine().open_file_dma(device_path).then([&] (file f) {
auto dev = make_lw_shared<flashcache::devfile>({std::move(f)});
return dev->f().stat().then([&, dev] (struct stat st) mutable {
assert(S_ISBLK(st.st_mode));
@@ -1849,7 +1849,7 @@ int start_instance(int ac, char** av) {
}).then([&tcp_server] {
return tcp_server.invoke_on_all(&memcache::tcp_server<WithFlashCache>::start);
}).then([&] {
if (engine.net().has_per_core_namespace()) {
if (engine().net().has_per_core_namespace()) {
return udp_server.start(std::ref(cache), std::ref(system_stats));
} else {
return udp_server.start_single(std::ref(cache), std::ref(system_stats));

View File

@@ -58,7 +58,7 @@ app_template::run(int ac, char ** av, std::function<void ()>&& func) {
}
smp::configure(configuration);
_configuration = {std::move(configuration)};
engine.when_started().then([this] {
engine().when_started().then([this] {
scollectd::configure( this->configuration());
}).then(
std::move(func)
@@ -67,10 +67,10 @@ app_template::run(int ac, char ** av, std::function<void ()>&& func) {
get_ex();
} catch (std::exception& ex) {
std::cout << "program failed with uncaught exception: " << ex.what() << "\n";
engine.exit(1);
engine().exit(1);
}
});
return engine.run();
return engine().run();
}

View File

@@ -177,7 +177,7 @@ distributed<Service>::invoke_on_all(void (Service::*func)(Args...), Args... args
template <typename Service>
Service& distributed<Service>::local() {
return *_instances[engine.cpu_id()];
return *_instances[engine().cpu_id()];
}
// Smart pointer wrapper which makes it safe to move across CPUs.
@@ -190,19 +190,19 @@ private:
unsigned _cpu;
private:
bool on_origin() {
return engine.cpu_id() == _cpu;
return engine().cpu_id() == _cpu;
}
public:
using element_type = typename PtrType::element_type;
foreign_ptr()
: _value(PtrType())
, _cpu(engine.cpu_id()) {
, _cpu(engine().cpu_id()) {
}
foreign_ptr(std::nullptr_t) : foreign_ptr() {}
foreign_ptr(PtrType value)
: _value(std::move(value))
, _cpu(engine.cpu_id()) {
, _cpu(engine().cpu_id()) {
}
// The type is intentionally non-copyable because copies
// are expensive because each copy requires across-CPU call.

View File

@@ -153,7 +153,7 @@ future<> reactor_backend_epoll::get_epoll_future(pollable_fd_state& pfd,
eevt.data.ptr = &pfd;
int r = ::epoll_ctl(_epollfd.get(), ctl, pfd.fd.get(), &eevt);
assert(r == 0);
engine.start_epoll();
engine().start_epoll();
}
pfd.*pr = promise<>();
return (pfd.*pr).get_future();
@@ -273,7 +273,7 @@ bool reactor::process_io()
future<size_t>
posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) {
return engine.submit_io([this, pos, buffer, len] (iocb& io) {
return engine().submit_io([this, pos, buffer, len] (iocb& io) {
io_prep_pwrite(&io, _fd, const_cast<void*>(buffer), len, pos);
}).then([] (io_event ev) {
throw_kernel_error(long(ev.res));
@@ -283,7 +283,7 @@ posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len) {
future<size_t>
posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov) {
return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
io_prep_pwritev(&io, _fd, iov.data(), iov.size(), pos);
}).then([] (io_event ev) {
throw_kernel_error(long(ev.res));
@@ -293,7 +293,7 @@ posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov) {
future<size_t>
posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) {
return engine.submit_io([this, pos, buffer, len] (iocb& io) {
return engine().submit_io([this, pos, buffer, len] (iocb& io) {
io_prep_pread(&io, _fd, buffer, len, pos);
}).then([] (io_event ev) {
throw_kernel_error(long(ev.res));
@@ -303,7 +303,7 @@ posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len) {
future<size_t>
posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov) {
return engine.submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
return engine().submit_io([this, pos, iov = std::move(iov)] (iocb& io) {
io_prep_preadv(&io, _fd, iov.data(), iov.size(), pos);
}).then([] (io_event ev) {
throw_kernel_error(long(ev.res));
@@ -333,7 +333,7 @@ reactor::open_directory(sstring name) {
future<>
posix_file_impl::flush(void) {
return engine._thread_pool.submit<syscall_result<int>>([this] {
return engine()._thread_pool.submit<syscall_result<int>>([this] {
return wrap_syscall<int>(::fsync(_fd));
}).then([] (syscall_result<int> sr) {
sr.throw_if_error();
@@ -343,7 +343,7 @@ posix_file_impl::flush(void) {
future<struct stat>
posix_file_impl::stat(void) {
return engine._thread_pool.submit<syscall_result_extra<struct stat>>([this] {
return engine()._thread_pool.submit<syscall_result_extra<struct stat>>([this] {
struct stat st;
auto ret = ::fstat(_fd, &st);
return wrap_syscall(ret, st);
@@ -355,7 +355,7 @@ posix_file_impl::stat(void) {
future<>
posix_file_impl::discard(uint64_t offset, uint64_t length) {
return engine._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
return engine()._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
offset, length));
}).then([] (syscall_result<int> sr) {
@@ -366,7 +366,7 @@ posix_file_impl::discard(uint64_t offset, uint64_t length) {
future<>
blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
return engine._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
return engine()._thread_pool.submit<syscall_result<int>>([this, offset, length] () mutable {
uint64_t range[2] { offset, length };
return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
}).then([] (syscall_result<int> sr) {
@@ -384,7 +384,7 @@ posix_file_impl::size(void) {
future<size_t>
blockdev_file_impl::size(void) {
return engine._thread_pool.submit<syscall_result_extra<size_t>>([this] {
return engine()._thread_pool.submit<syscall_result_extra<size_t>>([this] {
size_t size;
int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
return wrap_syscall(ret, size);
@@ -431,7 +431,7 @@ posix_file_impl::list_directory(std::function<future<> (directory_entry de)> nex
auto eofcond = [w] { return w->eof; };
return do_until(eofcond, [w, this] {
if (w->current == w->total) {
return engine._thread_pool.submit<syscall_result<long>>([w , this] () {
return engine()._thread_pool.submit<syscall_result<long>>([w , this] () {
auto ret = ::syscall(__NR_getdents, _fd, reinterpret_cast<linux_dirent*>(w->buffer), sizeof(w->buffer));
return wrap_syscall(ret);
}).then([w] (syscall_result<long> ret) {
@@ -524,13 +524,13 @@ future<> reactor::run_exit_tasks() {
}
void reactor::stop() {
assert(engine._id == 0);
assert(engine()._id == 0);
run_exit_tasks().then([this] {
auto sem = new semaphore(0);
for (unsigned i = 1; i < smp::count; i++) {
smp::submit_to<>(i, []() {
return engine.run_exit_tasks().then([] {
engine._stopped = true;
return engine().run_exit_tasks().then([] {
engine()._stopped = true;
});
}).then([sem, i]() {
sem->signal();
@@ -558,7 +558,7 @@ reactor::receive_signal(int signo) {
}
void sigaction(int signo, siginfo_t* siginfo, void* ignore) {
engine._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
engine()._pending_signals.fetch_or(1ull << signo, std::memory_order_relaxed);
}
bool reactor::poll_signal() {
@@ -691,7 +691,7 @@ int reactor::run() {
_network_stack = std::move(stack);
for (unsigned c = 0; c < smp::count; c++) {
smp::submit_to(c, [] {
engine._cpu_started.signal();
engine()._cpu_started.signal();
});
}
});
@@ -804,7 +804,7 @@ public:
explicit registration_task(poller* p) : _p(p) {}
virtual void run() noexcept override {
if (_p) {
engine.register_poller(_p->_pollfn.get());
engine().register_poller(_p->_pollfn.get());
_p->_registration_task = nullptr;
}
}
@@ -822,7 +822,7 @@ private:
public:
explicit deregistration_task(std::unique_ptr<pollfn>&& p) : _p(std::move(p)) {}
virtual void run() noexcept override {
engine.unregister_poller(_p.get());
engine().unregister_poller(_p.get());
}
};
@@ -862,7 +862,7 @@ reactor::poller::do_register() {
// the poller instead.
auto task = std::make_unique<registration_task>(this);
auto tmp = task.get();
engine.add_task(std::move(task));
engine().add_task(std::move(task));
_registration_task = tmp;
}
@@ -883,8 +883,8 @@ reactor::poller::~poller() {
auto dummy = make_pollfn([] { return false; });
auto dummy_p = dummy.get();
auto task = std::make_unique<deregistration_task>(std::move(dummy));
engine.add_task(std::move(task));
engine.replace_poller(_pollfn.get(), dummy_p);
engine().add_task(std::move(task));
engine().replace_poller(_pollfn.get(), dummy_p);
}
}
}
@@ -965,7 +965,7 @@ void smp_message_queue::submit_item(smp_message_queue::work_item* item) {
void smp_message_queue::respond(work_item* item) {
_completed_fifo.push_back(item);
if (_completed_fifo.size() >= batch_size || engine._stopped) {
if (_completed_fifo.size() >= batch_size || engine()._stopped) {
flush_response_batch();
}
}
@@ -1027,7 +1027,7 @@ size_t smp_message_queue::process_incoming() {
void smp_message_queue::start(unsigned cpuid) {
_tx.init();
char instance[10];
std::snprintf(instance, sizeof(instance), "%u-%u", engine.cpu_id(), cpuid);
std::snprintf(instance, sizeof(instance), "%u-%u", engine().cpu_id(), cpuid);
_collectd_regs = {
// queue_length value:GAUGE:0:U
// Absolute value of num packets in last tx batch.
@@ -1076,7 +1076,7 @@ void smp_message_queue::start(unsigned cpuid) {
#ifndef HAVE_OSV
thread_pool::thread_pool() : _worker_thread([this] { work(); }), _notify(pthread_self()) {
keep_doing([this] {
return engine.receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); });
return engine().receive_signal(SIGUSR1).then([this] { inter_thread_wq.complete(); });
});
}
@@ -1132,7 +1132,7 @@ file_desc readable_eventfd::try_create_eventfd(size_t initial) {
}
future<size_t> readable_eventfd::wait() {
return engine.readable(*_fd._s).then([this] {
return engine().readable(*_fd._s).then([this] {
uint64_t count;
int r = ::read(_fd.get_fd(), &count, sizeof(count));
assert(r == sizeof(count));
@@ -1141,7 +1141,7 @@ future<size_t> readable_eventfd::wait() {
}
void schedule(std::unique_ptr<task> t) {
engine.add_task(std::move(t));
engine().add_task(std::move(t));
}
bool operator==(const ::sockaddr_in a, const ::sockaddr_in b) {
@@ -1219,8 +1219,8 @@ unsigned smp::count = 1;
void smp::start_all_queues()
{
for (unsigned c = 0; c < count; c++) {
if (c != engine.cpu_id()) {
_qs[c][engine.cpu_id()].start(c);
if (c != engine().cpu_id()) {
_qs[c][engine().cpu_id()].start(c);
}
}
}
@@ -1300,11 +1300,11 @@ void smp::configure(boost::program_options::variables_map configuration)
sigfillset(&mask);
auto r = ::sigprocmask(SIG_BLOCK, &mask, NULL);
throw_system_error_on(r == -1);
engine._id = i;
engine()._id = i;
start_all_queues();
inited.wait();
engine.configure(configuration);
engine.run();
engine().configure(configuration);
engine().run();
});
}
@@ -1317,14 +1317,14 @@ void smp::configure(boost::program_options::variables_map configuration)
start_all_queues();
inited.wait();
engine.configure(configuration);
engine._lowres_clock = std::make_unique<lowres_clock>();
engine().configure(configuration);
engine()._lowres_clock = std::make_unique<lowres_clock>();
}
__thread size_t future_avail_count = 0;
__thread size_t task_quota = 0;
thread_local reactor engine;
thread_local reactor local_engine;
class reactor_notifier_epoll : public reactor_notifier {
@@ -1361,7 +1361,7 @@ class reactor_notifier_osv :
bool _needed = false;
public:
virtual future<> wait() override {
return engine.notified(this);
return engine().notified(this);
}
virtual void signal() override {
wake();

View File

@@ -811,9 +811,13 @@ reactor::make_pollfn(Func&& func) {
return std::make_unique<the_pollfn>(std::forward<Func>(func));
}
extern thread_local reactor engine;
extern thread_local reactor local_engine;
extern __thread size_t task_quota;
inline reactor& engine() {
return local_engine;
}
class smp {
#if HAVE_DPDK
using thread_adaptor = std::function<void ()>;
@@ -837,10 +841,10 @@ public:
template <typename Func>
static std::result_of_t<Func()> submit_to(unsigned t, Func func,
std::enable_if_t<returns_future<Func>::value, void*> = nullptr) {
if (t == engine.cpu_id()) {
if (t == engine().cpu_id()) {
return func();
} else {
return _qs[t][engine.cpu_id()].submit(std::move(func));
return _qs[t][engine().cpu_id()].submit(std::move(func));
}
}
template <typename Func>
@@ -861,11 +865,11 @@ public:
static bool poll_queues() {
size_t got = 0;
for (unsigned i = 0; i < count; i++) {
if (engine.cpu_id() != i) {
auto& rxq = _qs[engine.cpu_id()][i];
if (engine().cpu_id() != i) {
auto& rxq = _qs[engine().cpu_id()][i];
rxq.flush_response_batch();
got += rxq.process_incoming();
auto& txq = _qs[i][engine._id];
auto& txq = _qs[i][engine()._id];
txq.flush_request_batch();
got += txq.process_completions();
}
@@ -881,7 +885,7 @@ public:
inline
pollable_fd_state::~pollable_fd_state() {
engine.forget(*this);
engine().forget(*this);
}
class data_source_impl {
@@ -1315,32 +1319,32 @@ output_stream<CharType>::flush() {
inline
future<size_t> pollable_fd::read_some(char* buffer, size_t size) {
return engine.read_some(*_s, buffer, size);
return engine().read_some(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::read_some(uint8_t* buffer, size_t size) {
return engine.read_some(*_s, buffer, size);
return engine().read_some(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::read_some(const std::vector<iovec>& iov) {
return engine.read_some(*_s, iov);
return engine().read_some(*_s, iov);
}
inline
future<> pollable_fd::write_all(const char* buffer, size_t size) {
return engine.write_all(*_s, buffer, size);
return engine().write_all(*_s, buffer, size);
}
inline
future<> pollable_fd::write_all(const uint8_t* buffer, size_t size) {
return engine.write_all(*_s, buffer, size);
return engine().write_all(*_s, buffer, size);
}
inline
future<size_t> pollable_fd::write_some(net::packet& p) {
return engine.writeable(*_s).then([this, &p] () mutable {
return engine().writeable(*_s).then([this, &p] () mutable {
static_assert(offsetof(iovec, iov_base) == offsetof(net::fragment, base) &&
sizeof(iovec::iov_base) == sizeof(net::fragment::base) &&
offsetof(iovec, iov_len) == offsetof(net::fragment, size) &&
@@ -1374,22 +1378,22 @@ future<> pollable_fd::write_all(net::packet& p) {
inline
future<> pollable_fd::readable() {
return engine.readable(*_s);
return engine().readable(*_s);
}
inline
future<> pollable_fd::writeable() {
return engine.writeable(*_s);
return engine().writeable(*_s);
}
inline
future<pollable_fd, socket_address> pollable_fd::accept() {
return engine.accept(*_s);
return engine().accept(*_s);
}
inline
future<size_t> pollable_fd::recvmsg(struct msghdr *msg) {
return engine.readable(*_s).then([this, msg] {
return engine().readable(*_s).then([this, msg] {
auto r = get_file_desc().recvmsg(msg, 0);
if (!r) {
return recvmsg(msg);
@@ -1408,7 +1412,7 @@ future<size_t> pollable_fd::recvmsg(struct msghdr *msg) {
inline
future<size_t> pollable_fd::sendmsg(struct msghdr* msg) {
return engine.writeable(*_s).then([this, msg] () mutable {
return engine().writeable(*_s).then([this, msg] () mutable {
auto r = get_file_desc().sendmsg(msg, 0);
if (!r) {
return sendmsg(msg);
@@ -1425,7 +1429,7 @@ future<size_t> pollable_fd::sendmsg(struct msghdr* msg) {
inline
future<size_t> pollable_fd::sendto(socket_address addr, const void* buf, size_t len) {
return engine.writeable(*_s).then([this, buf, len, addr] () mutable {
return engine().writeable(*_s).then([this, buf, len, addr] () mutable {
auto r = get_file_desc().sendto(addr, buf, len, 0);
if (!r) {
return sendto(std::move(addr), buf, len);
@@ -1442,7 +1446,7 @@ template <typename Clock>
inline
timer<Clock>::~timer() {
if (_queued) {
engine.del_timer(this);
engine().del_timer(this);
}
}
@@ -1460,7 +1464,7 @@ void timer<Clock>::arm(time_point until, boost::optional<duration> period) {
_armed = true;
_expired = false;
_expiry = until;
engine.add_timer(this);
engine().add_timer(this);
_queued = true;
}
@@ -1493,7 +1497,7 @@ bool timer<Clock>::cancel() {
}
_armed = false;
if (_queued) {
engine.del_timer(this);
engine().del_timer(this);
_queued = false;
}
return true;

View File

@@ -93,7 +93,7 @@ public:
_period = period;
_addr = addr;
_host = host;
_chan = engine.net().make_udp_channel();
_chan = engine().net().make_udp_channel();
_timer.set_callback(std::bind(&impl::run, this));
// dogfood ourselves
@@ -285,7 +285,7 @@ private:
// Optional
put_cached(part_type::PluginInst,
id.plugin_instance() == per_cpu_plugin_instance ?
std::to_string(engine.cpu_id()) : id.plugin_instance());
std::to_string(engine().cpu_id()) : id.plugin_instance());
put_cached(part_type::Type, id.type());
// Optional
put_cached(part_type::TypeInst, id.type_instance());

View File

@@ -136,7 +136,7 @@ class kernel_evtchn: public evtchn {
public:
kernel_evtchn(unsigned otherend)
: evtchn(otherend)
, _notified(engine.make_reactor_notifier()) {}
, _notified(engine().make_reactor_notifier()) {}
virtual port bind() override;
virtual void notify(int port) override;
};

View File

@@ -316,7 +316,7 @@ public:
return make_ready_future<>();
}
handled = true;
auto src_cpu = engine.cpu_id();
auto src_cpu = engine().cpu_id();
if (src_cpu == 0) {
return process_packet(std::move(p), dhp, opt_off);
}

View File

@@ -136,7 +136,7 @@ public:
dpdk_device(uint8_t port_idx, uint16_t num_queues)
: _port_idx(port_idx)
, _num_queues(num_queues)
, _home_cpu(engine.cpu_id()) {
, _home_cpu(engine().cpu_id()) {
/* now initialise the port we will use */
int ret = init_port_start();

View File

@@ -151,7 +151,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) {
auto dropped_size = frag.mem_size;
auto& ip_data = frag.data.map.begin()->second;
// Choose a cpu to forward this packet
auto cpu_id = engine.cpu_id();
auto cpu_id = engine().cpu_id();
auto l4 = _l4[h.ip_proto];
if (l4) {
size_t l4_offset = 0;
@@ -163,7 +163,7 @@ ipv4::handle_received_packet(packet p, ethernet_address from) {
}
// No need to forward if the dst cpu is the current cpu
if (cpu_id == engine.cpu_id()) {
if (cpu_id == engine().cpu_id()) {
l4->received(std::move(ip_data), h.src_ip, h.dst_ip);
} else {
auto to = _netif->hw_address();

View File

@@ -77,7 +77,7 @@ void create_native_net_device(boost::program_options::variables_map opts) {
std::shared_ptr<device> sdev(dev.release());
for (unsigned i = 0; i < smp::count; i++) {
smp::submit_to(i, [opts, sdev] {
uint16_t qid = engine.cpu_id();
uint16_t qid = engine().cpu_id();
if (qid < sdev->hw_queues_count()) {
auto qp = sdev->init_local_queue(opts, qid);
std::map<unsigned, float> cpu_weights;
@@ -130,7 +130,7 @@ public:
virtual udp_channel make_udp_channel(ipv4_addr addr) override;
virtual future<> initialize() override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {
if (engine.cpu_id() == 0) {
if (engine().cpu_id() == 0) {
create_native_net_device(opts);
}
return ready_promise.get_future();
@@ -199,7 +199,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) {
// Hijack the ip-stack.
for (unsigned i = 0; i < smp::count; i++) {
smp::submit_to(i, [d] {
auto & ns = static_cast<native_network_stack&>(engine.net());
auto & ns = static_cast<native_network_stack&>(engine().net());
ns.set_ipv4_packet_filter(d->get_ipv4_filter());
});
}
@@ -209,7 +209,7 @@ future<> native_network_stack::run_dhcp(bool is_renew, const dhcp::lease& res) {
return fut.then([this, d, is_renew](bool success, const dhcp::lease & res) {
for (unsigned i = 0; i < smp::count; i++) {
smp::submit_to(i, [] {
auto & ns = static_cast<native_network_stack&>(engine.net());
auto & ns = static_cast<native_network_stack&>(engine().net());
ns.set_ipv4_packet_filter(nullptr);
});
}
@@ -228,12 +228,12 @@ void native_network_stack::on_dhcp(bool success, const dhcp::lease & res, bool i
_config.set_value();
}
if (engine.cpu_id() == 0) {
if (engine().cpu_id() == 0) {
// And the other cpus, which, in the case of initial discovery,
// will be waiting for us.
for (unsigned i = 1; i < smp::count; i++) {
smp::submit_to(i, [success, res, is_renew]() {
auto & ns = static_cast<native_network_stack&>(engine.net());
auto & ns = static_cast<native_network_stack&>(engine().net());
ns.on_dhcp(success, res, is_renew);
});
}
@@ -259,7 +259,7 @@ future<> native_network_stack::initialize() {
// Only run actual discover on main cpu.
// All other cpus must simply for main thread to complete and signal them.
if (engine.cpu_id() == 0) {
if (engine().cpu_id() == 0) {
run_dhcp();
}
return _config.get_future();
@@ -270,7 +270,7 @@ void arp_learn(ethernet_address l2, ipv4_address l3)
{
for (unsigned i = 0; i < smp::count; i++) {
smp::submit_to(i, [l2, l3] {
auto & ns = static_cast<native_network_stack&>(engine.net());
auto & ns = static_cast<native_network_stack&>(engine().net());
ns.arp_learn(l2, l3);
});
}

View File

@@ -77,7 +77,7 @@ qp::~qp() {
void qp::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
assert(!cpu_weights.empty());
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine.cpu_id())) {
if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == engine().cpu_id())) {
// special case queue sending to self only, to avoid requiring a hash value
return;
}
@@ -113,15 +113,15 @@ void qp::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
subscription<packet>
device::receive(std::function<future<> (packet)> next_packet) {
auto sub = _queues[engine.cpu_id()]->_rx_stream.listen(std::move(next_packet));
_queues[engine.cpu_id()]->rx_start();
auto sub = _queues[engine().cpu_id()]->_rx_stream.listen(std::move(next_packet));
_queues[engine().cpu_id()]->rx_start();
return std::move(sub);
}
void device::set_local_queue(std::unique_ptr<qp> dev) {
assert(!_queues[engine.cpu_id()]);
_queues[engine.cpu_id()] = dev.get();
engine.at_destroy([dev = std::move(dev)] {});
assert(!_queues[engine().cpu_id()]);
_queues[engine().cpu_id()] = dev.get();
engine().at_destroy([dev = std::move(dev)] {});
}
@@ -181,7 +181,7 @@ void interface::forward(unsigned cpuid, packet p) {
if (queue_depth < 1000) {
queue_depth++;
auto src_cpu = engine.cpu_id();
auto src_cpu = engine().cpu_id();
smp::submit_to(cpuid, [this, p = std::move(p), src_cpu]() mutable {
_dev->l2receive(p.free_on_cpu(src_cpu));
}).then([] {
@@ -196,7 +196,7 @@ future<> interface::dispatch_packet(packet p) {
auto i = _proto_map.find(ntoh(eh->eth_proto));
if (i != _proto_map.end()) {
l3_rx_stream& l3 = i->second;
auto fw = _dev->forward_dst(engine.cpu_id(), [&p, &l3] () {
auto fw = _dev->forward_dst(engine().cpu_id(), [&p, &l3] () {
auto hwrss = p.rss_hash();
if (hwrss) {
return hwrss.value();
@@ -208,7 +208,7 @@ future<> interface::dispatch_packet(packet p) {
return 0u;
}
});
if (fw != engine.cpu_id()) {
if (fw != engine().cpu_id()) {
forward(fw, std::move(p));
} else {
auto h = ntoh(*eh);

View File

@@ -169,8 +169,8 @@ public:
}
virtual ~device() {};
qp& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
qp& local_queue() { return queue_for_cpu(engine.cpu_id()); }
void l2receive(packet p) { _queues[engine.cpu_id()]->_rx_stream.produce(std::move(p)); }
qp& local_queue() { return queue_for_cpu(engine().cpu_id()); }
void l2receive(packet p) { _queues[engine().cpu_id()]->_rx_stream.produce(std::move(p)); }
subscription<packet> receive(std::function<future<> (packet)> next_packet);
virtual ethernet_address hw_address() = 0;
virtual net::hw_features hw_features() = 0;

View File

@@ -29,7 +29,7 @@ posix_server_socket_impl::accept() {
static unsigned balance = 0;
auto cpu = balance++ % smp::count;
if (cpu == engine.cpu_id()) {
if (cpu == engine().cpu_id()) {
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
return make_ready_future<connected_socket, socket_address>(
connected_socket(std::move(csi)), sa);
@@ -126,14 +126,14 @@ posix_data_sink_impl::put(packet p) {
server_socket
posix_network_stack::listen(socket_address sa, listen_options opt) {
if (_reuseport)
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine().posix_listen(sa, opt)));
else
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
return server_socket(std::make_unique<posix_server_socket_impl>(sa, engine().posix_listen(sa, opt)));
}
future<connected_socket>
posix_network_stack::connect(socket_address sa) {
return engine.posix_connect(sa).then([] (pollable_fd fd) {
return engine().posix_connect(sa).then([] (pollable_fd fd) {
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
return make_ready_future<connected_socket>(connected_socket(std::move(csi)));
});
@@ -145,14 +145,14 @@ thread_local std::unordered_multimap<::sockaddr_in, posix_ap_server_socket_impl:
server_socket
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
if (_reuseport)
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine.posix_listen(sa, opt)));
return server_socket(std::make_unique<posix_reuseport_server_socket_impl>(sa, engine().posix_listen(sa, opt)));
else
return server_socket(std::make_unique<posix_ap_server_socket_impl>(sa));
}
future<connected_socket>
posix_ap_network_stack::connect(socket_address sa) {
return engine.posix_connect(sa).then([] (pollable_fd fd) {
return engine().posix_connect(sa).then([] (pollable_fd fd) {
std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(std::move(fd)));
return make_ready_future<connected_socket>(connected_socket(std::move(csi)));
});
@@ -221,7 +221,7 @@ public:
auto sa = make_ipv4_address(bind_address);
file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
fd.setsockopt(SOL_IP, IP_PKTINFO, true);
if (engine.posix_reuseport_available()) {
if (engine().posix_reuseport_available()) {
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
}
fd.bind(sa.u.sa, sizeof(sa.u.sas));

View File

@@ -71,7 +71,7 @@ class posix_network_stack : public network_stack {
private:
const bool _reuseport;
public:
explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine.posix_reuseport_available()) {}
explicit posix_network_stack(boost::program_options::variables_map opts) : _reuseport(engine().posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
virtual future<connected_socket> connect(socket_address sa) override;
virtual net::udp_channel make_udp_channel(ipv4_addr addr) override;
@@ -85,7 +85,7 @@ class posix_ap_network_stack : public posix_network_stack {
private:
const bool _reuseport;
public:
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine.posix_reuseport_available()) {}
posix_ap_network_stack(boost::program_options::variables_map opts) : posix_network_stack(std::move(opts)), _reuseport(engine().posix_reuseport_available()) {}
virtual server_socket listen(socket_address sa, listen_options opts) override;
virtual future<connected_socket> connect(socket_address sa) override;
static future<std::unique_ptr<network_stack>> create(boost::program_options::variables_map opts) {

View File

@@ -39,7 +39,7 @@ uint32_t proxy_net_device::send(circular_buffer<packet>& p)
if (!_moving.empty()) {
qp* dev = &_dev->queue_for_cpu(_cpu);
auto cpu = engine.cpu_id();
auto cpu = engine().cpu_id();
smp::submit_to(_cpu, [this, dev, cpu]() mutable {
for(size_t i = 0; i < _moving.size(); i++) {
dev->proxy_send(_moving[i].free_on_cpu(cpu, [this] { _send_depth--; }));

View File

@@ -469,7 +469,7 @@ future<typename tcp<InetTraits>::connection> tcp<InetTraits>::connect(socket_add
do {
src_port = _port_dist(_e);
id = connid{src_ip, dst_ip, src_port, dst_port};
} while (_inet._inet.netif()->hash2cpu(id.hash()) != engine.cpu_id()
} while (_inet._inet.netif()->hash2cpu(id.hash()) != engine().cpu_id()
|| _tcbs.find(id) != _tcbs.end());
auto tcbp = make_lw_shared<tcb>(*this, id);

View File

@@ -912,7 +912,7 @@ qp_osv::qp_osv(osv::assigned_virtio &virtio,
// Set up interrupts
// FIXME: in OSv, the first thing we do in the handler is to call
// _rqx.disable_interrupts(). Here in seastar, we only do it much later
// in the main engine. Probably needs to do it like in osv - in the beginning of the handler.
// in the main engine(). Probably needs to do it like in osv - in the beginning of the handler.
_virtio.enable_interrupt(
0, [&] { _rxq.wake_notifier_wait(); } );
_virtio.enable_interrupt(

View File

@@ -25,7 +25,7 @@ int main(int ac, char** av) {
auto&& config = app.configuration();
auto filepath = config["dev"].as<std::string>();
engine.open_file_dma(filepath).then([] (file f) {
engine().open_file_dma(filepath).then([] (file f) {
auto ft = new file_test{std::move(f)};
ft->f.stat().then([ft] (struct stat st) mutable {
@@ -42,7 +42,7 @@ int main(int ac, char** av) {
}).then([ft] () mutable {
std::cout << "done\n";
delete ft;
engine.exit(0);
engine().exit(0);
});
});
});

View File

@@ -25,11 +25,11 @@ int main(int ac, char** av) {
}
};
return app_template().run(ac, av, [] {
return engine.open_directory(".").then([] (file f) {
return engine().open_directory(".").then([] (file f) {
auto l = make_lw_shared<lister>(std::move(f));
return l->done().then([l] {
// ugly thing to keep *l alive
engine.exit(0);
engine().exit(0);
});
});
});

View File

@@ -101,7 +101,7 @@ int main(int ac, char** av) {
dnet->receive([vnet, &rx] (packet p) {
return echo_packet(*vnet, std::move(p));
});
engine.run();
engine().run();
return 0;
}

View File

@@ -14,7 +14,7 @@ struct file_test {
int main(int ac, char** av) {
static constexpr auto max = 10000;
engine.open_file_dma("testfile.tmp").then([] (file f) {
engine().open_file_dma("testfile.tmp").then([] (file f) {
auto ft = new file_test{std::move(f)};
for (size_t i = 0; i < max; ++i) {
ft->par.wait().then([ft, i] {
@@ -45,7 +45,7 @@ int main(int ac, char** av) {
::exit(0);
});
});
engine.run();
engine().run();
return 0;
}

View File

@@ -20,7 +20,7 @@ int main(int ac, char** av) {
interface netif(std::move(vnet));
ipv4 inet(&netif);
inet.set_host_address(ipv4_address("192.168.122.2"));
engine.run();
engine().run();
return 0;
}

View File

@@ -23,7 +23,7 @@ int main(int ac, char** av) {
interface netif(std::move(vnet));
l3_protocol arp(&netif, eth_protocol_num::arp, []{ return std::experimental::optional<l3_protocol::l3packet>(); });
dump_arp_packets(arp);
engine.run();
engine().run();
return 0;
}

View File

@@ -55,7 +55,7 @@ int main(int ac, char** av) {
return report("smp exception", test_smp_exception());
}).then([] {
print("\n%d tests / %d failures\n", tests, fails);
engine.exit(fails ? 1 : 0);
engine().exit(fails ? 1 : 0);
});
});
}

View File

@@ -63,7 +63,7 @@ public:
};
void start(ipv4_addr server_addr, std::string mode) {
engine.net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) {
engine().net().connect(make_ipv4_address(server_addr)).then([this, mode] (connected_socket fd) {
_sockets.push_back(std::move(fd));
auto conn = new connection(std::move(_sockets[0]));
if (mode == "write") {

View File

@@ -26,7 +26,7 @@ public:
future<> listen(ipv4_addr addr) {
listen_options lo;
lo.reuse_address = true;
_listeners.push_back(engine.listen(make_ipv4_address(addr), lo));
_listeners.push_back(engine().listen(make_ipv4_address(addr), lo));
do_accepts(_listeners.size() - 1);
return make_ready_future<>();
}

View File

@@ -46,8 +46,8 @@ int main(int ac, char** av) {
ipv4 inet(&netif);
inet.set_host_address(ipv4_address("192.168.122.2"));
tcp_test tt(inet);
engine.when_started().then([&tt] { tt.run(); });
engine.run();
engine().when_started().then([&tt] { tt.run(); });
engine().run();
}

View File

@@ -41,9 +41,9 @@ int main(int ac, char** av)
ipv4_addr addr{10000};
listen_options lo;
lo.reuse_address = true;
test t(engine.posix_listen(make_ipv4_address(addr), lo));
test t(engine().posix_listen(make_ipv4_address(addr), lo));
t.start_accept();
engine.run();
engine().run();
return 0;
}

View File

@@ -33,18 +33,18 @@ public:
boost::program_options::variables_map configuration;
auto opts = reactor::get_options_description();
bpo::store(bpo::command_line_parser(0, nullptr).options(opts).run(), configuration);
engine.configure(configuration);
engine.when_started().then([this] {
engine().configure(configuration);
engine().when_started().then([this] {
return run_test_case();
}).rescue([] (auto get) {
try {
get();
engine.exit(0);
engine().exit(0);
} catch (...) {
std::terminate();
}
});
engine.run();
engine().run();
});
t.join();
}

View File

@@ -19,7 +19,7 @@ public:
void start(ipv4_addr server_addr) {
std::cout << "Sending to " << server_addr << std::endl;
_chan = engine.net().make_udp_channel();
_chan = engine().net().make_udp_channel();
_stats_timer.set_callback([this] {
std::cout << "Out: " << n_sent << " pps, \t";

View File

@@ -17,7 +17,7 @@ private:
public:
void start(uint16_t port) {
ipv4_addr listen_addr{port};
_chan = engine.net().make_udp_channel(listen_addr);
_chan = engine().net().make_udp_channel(listen_addr);
_stats_timer.set_callback([this] {
std::cout << "Out: " << _n_sent << " pps" << std::endl;

View File

@@ -52,7 +52,7 @@ public:
}
void start(int chunk_size, bool copy, size_t mem_size) {
ipv4_addr listen_addr{10000};
_chan = engine.net().make_udp_channel(listen_addr);
_chan = engine().net().make_udp_channel(listen_addr);
std::cout << "Listening on " << listen_addr << std::endl;