From c4c5899f898fc17ba2f6754d6a72687f22836f1c Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Wed, 18 Feb 2015 17:14:15 +0200 Subject: [PATCH 1/3] net: handle arp resolution errors in tcp Pass timeouts up the calling chain and schedule retry if waiter list is too long. --- net/tcp.hh | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/net/tcp.hh b/net/tcp.hh index 66cfcc5bad..066edfe3b6 100644 --- a/net/tcp.hh +++ b/net/tcp.hh @@ -323,7 +323,21 @@ private: void output() { if (!_poll_active) { _poll_active = true; - _tcp.poll_tcb(_foreign_ip, this->shared_from_this()); + _tcp.poll_tcb(_foreign_ip, this->shared_from_this()).rescue([this] (auto get) { + try { + get(); + } catch(arp_queue_full_error& ex) { + // retry later + _poll_active = false; + this->start_retransmit_timer(); + } catch(arp_timeout_error& ex) { + if (this->in_state(SYN_SENT)) { + _connect_done.set_exception(ex); + this->cleanup(); + } + // in other states connection should time out + } + }); } } future<> connect_done() { @@ -561,7 +575,7 @@ public: listener listen(uint16_t port, size_t queue_length = 100); future connect(socket_address sa); net::hw_features hw_features() { return _inet._inet.hw_features(); } - void poll_tcb(ipaddr to, lw_shared_ptr tcb); + future<> poll_tcb(ipaddr to, lw_shared_ptr tcb); private: void send_packet_without_tcb(ipaddr from, ipaddr to, packet p); void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip); @@ -596,9 +610,9 @@ tcp::tcp(inet_type& inet) : _inet(inet), _e(_rd()) { } template -void tcp::poll_tcb(ipaddr to, lw_shared_ptr tcb) { - _inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) { - _poll_tcbs.emplace_back(std::move(tcb), dst); +future<> tcp::poll_tcb(ipaddr to, lw_shared_ptr tcb) { + return _inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) { + _poll_tcbs.emplace_back(std::move(tcb), dst); }); } From 861d2625b23b1cf299a6d135699f306ba2308b6a Mon Sep 17 00:00:00 2001 From: Glauber Costa Date: Wed, 18 Feb 2015 15:46:09 -0500 Subject: [PATCH 2/3] file_stream: proper seek support. Our file_stream interface supports seek, but when we try to seek to arbitrary locations that are smaller than an aio-boundary (say, for instance, f->seek(4)), we will end up not being able to perform the read. We need to guarantee the reads are aligned, and will then present to the caller the buffer properly offset. Signed-off-by: Glauber Costa --- core/fstream.cc | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/fstream.cc b/core/fstream.cc index fe07627cc1..e258884ab2 100644 --- a/core/fstream.cc +++ b/core/fstream.cc @@ -17,10 +17,18 @@ file_data_source_impl::get() { auto q = static_cast(p); temporary_buffer buf(q, _buffer_size, make_free_deleter(p)); auto old_pos = _pos; - _pos += _buffer_size; + // dma_read needs to be aligned. It doesn't have to be page-aligned, + // though, and we could get away with something much smaller. However, if + // we start reading in things outside page boundaries, we will end up with + // various pages around, some of them with overlapping ranges. Those would + // be very challenging to cache. + old_pos &= ~4095; + auto front = _pos - old_pos; + _pos += _buffer_size - front; return _file.dma_read(old_pos, q, _buffer_size).then( - [buf = std::move(buf)] (size_t size) mutable { + [buf = std::move(buf), front] (size_t size) mutable { buf.trim(size); + buf.trim_front(front); return make_ready_future>(std::move(buf)); }); } From a8698fa17c62542a98e928d395f38e66f6ff148c Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 18 Feb 2015 18:03:34 +0200 Subject: [PATCH 3/3] core: demangle stdout When using print() to debug on smp, it is very annoying to get interleaved output. Fix by wrapping stdout with a fake stream that has a line buffer for each thread. --- configure.py | 1 + core/app-template.cc | 3 ++ core/stdio.cc | 76 ++++++++++++++++++++++++++++++++++++++++++++ core/stdio.hh | 12 +++++++ 4 files changed, 92 insertions(+) create mode 100644 core/stdio.cc create mode 100644 core/stdio.hh diff --git a/configure.py b/configure.py index dab6f98d2b..ddd7be6049 100755 --- a/configure.py +++ b/configure.py @@ -141,6 +141,7 @@ core = [ 'core/posix.cc', 'core/memory.cc', 'core/resource.cc', + 'core/stdio.cc', 'core/scollectd.cc', 'core/app-template.cc', 'core/dpdk_rte.cc', diff --git a/core/app-template.cc b/core/app-template.cc index b638eeba96..3910ef4727 100644 --- a/core/app-template.cc +++ b/core/app-template.cc @@ -6,6 +6,7 @@ #include "core/reactor.hh" #include "core/scollectd.hh" #include "core/print.hh" +#include "stdio.hh" #include #include #include @@ -70,6 +71,8 @@ app_template::run(int ac, char ** av, std::function&& func) { std::cout << _opts << "\n"; return 1; } + // intentional leak + stdout = smp_synchronize_lines(stdout); smp::configure(configuration); _configuration = {std::move(configuration)}; engine().when_started().then([this] { diff --git a/core/stdio.cc b/core/stdio.cc new file mode 100644 index 0000000000..ab1555c1de --- /dev/null +++ b/core/stdio.cc @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + + +#include "stdio.hh" +#include +#include +#include +#include +#include +#include +#include + +class spinlock { + pthread_spinlock_t _l; +public: + spinlock() { pthread_spin_init(&_l, PTHREAD_PROCESS_PRIVATE); } + ~spinlock() { pthread_spin_destroy(&_l); } + void lock() { pthread_spin_lock(&_l); } + void unlock() { pthread_spin_unlock(&_l); } +}; + +class smp_file { + FILE* _out; + spinlock _lock; + boost::thread_specific_ptr> _buffer; + static constexpr size_t max = 2000; +public: + smp_file(FILE* out) : _out(out) {} + ssize_t write(const char* buffer, size_t size) { + auto& b = *_buffer; + b.insert(b.end(), buffer, buffer + size); + size_t now = 0; + if (b.size() >= max) { + now = b.size(); + } else { + auto lf = std::find(b.rbegin(), b.rend(), '\n'); + if (lf != b.rend()) { + auto remain = lf - b.rbegin(); + now = b.size() - remain; + } + } + if (now) { + auto ret = fwrite(b.data(), 1, now, _out); + b.erase(b.begin(), b.begin() + now); + return ret; + } else { + return 0; + } + } +}; + +static smp_file* to_smp_file(void* cookie) { + return static_cast(cookie); +} + +FILE* +smp_synchronize_lines(FILE* out) { + auto p = std::make_unique(out); + cookie_io_functions_t vtable = {}; + vtable.write = [] (void* ck, const char* b, size_t s) { return to_smp_file(ck)->write(b, s); }; + vtable.close = [] (void* ck) { delete to_smp_file(ck); return 0; }; + auto ret = fopencookie(p.get(), "w", vtable); + if (!ret) { + return ret; + } + // disable buffering so that writes to ret don't get mixed + // up but are sent to smp_file immediately instead. + setvbuf(ret, nullptr, _IONBF, 0); + p.release(); + return ret; +} + + + diff --git a/core/stdio.hh b/core/stdio.hh new file mode 100644 index 0000000000..85694292cd --- /dev/null +++ b/core/stdio.hh @@ -0,0 +1,12 @@ +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#pragma once + +#include + +// Returns a FILE* that writes all its output to @out, but attempts +// not to mix lines if multiple threads write to it simultaenously. +FILE* smp_synchronize_lines(FILE* out); +