Merge branch 'master' of github.com:cloudius-systems/seastar into db

This commit is contained in:
Avi Kivity
2015-02-19 09:28:04 +02:00
6 changed files with 121 additions and 7 deletions

View File

@@ -186,6 +186,7 @@ core = [
'core/posix.cc',
'core/memory.cc',
'core/resource.cc',
'core/stdio.cc',
'core/scollectd.cc',
'core/app-template.cc',
'core/dpdk_rte.cc',

View File

@@ -6,6 +6,7 @@
#include "core/reactor.hh"
#include "core/scollectd.hh"
#include "core/print.hh"
#include "stdio.hh"
#include <boost/program_options.hpp>
#include <boost/make_shared.hpp>
#include <fstream>
@@ -70,6 +71,8 @@ app_template::run(int ac, char ** av, std::function<void ()>&& func) {
std::cout << _opts << "\n";
return 1;
}
// intentional leak
stdout = smp_synchronize_lines(stdout);
smp::configure(configuration);
_configuration = {std::move(configuration)};
engine().when_started().then([this] {

View File

@@ -17,10 +17,18 @@ file_data_source_impl::get() {
auto q = static_cast<char*>(p);
temporary_buffer<char> buf(q, _buffer_size, make_free_deleter(p));
auto old_pos = _pos;
_pos += _buffer_size;
// dma_read needs to be aligned. It doesn't have to be page-aligned,
// though, and we could get away with something much smaller. However, if
// we start reading in things outside page boundaries, we will end up with
// various pages around, some of them with overlapping ranges. Those would
// be very challenging to cache.
old_pos &= ~4095;
auto front = _pos - old_pos;
_pos += _buffer_size - front;
return _file.dma_read(old_pos, q, _buffer_size).then(
[buf = std::move(buf)] (size_t size) mutable {
[buf = std::move(buf), front] (size_t size) mutable {
buf.trim(size);
buf.trim_front(front);
return make_ready_future<temporary_buffer<char>>(std::move(buf));
});
}

76
core/stdio.cc Normal file
View File

@@ -0,0 +1,76 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "stdio.hh"
#include <memory>
#include <pthread.h>
#include <mutex>
#include <unordered_map>
#include <boost/thread/tss.hpp>
#include <vector>
#include <algorithm>
class spinlock {
pthread_spinlock_t _l;
public:
spinlock() { pthread_spin_init(&_l, PTHREAD_PROCESS_PRIVATE); }
~spinlock() { pthread_spin_destroy(&_l); }
void lock() { pthread_spin_lock(&_l); }
void unlock() { pthread_spin_unlock(&_l); }
};
class smp_file {
FILE* _out;
spinlock _lock;
boost::thread_specific_ptr<std::vector<char>> _buffer;
static constexpr size_t max = 2000;
public:
smp_file(FILE* out) : _out(out) {}
ssize_t write(const char* buffer, size_t size) {
auto& b = *_buffer;
b.insert(b.end(), buffer, buffer + size);
size_t now = 0;
if (b.size() >= max) {
now = b.size();
} else {
auto lf = std::find(b.rbegin(), b.rend(), '\n');
if (lf != b.rend()) {
auto remain = lf - b.rbegin();
now = b.size() - remain;
}
}
if (now) {
auto ret = fwrite(b.data(), 1, now, _out);
b.erase(b.begin(), b.begin() + now);
return ret;
} else {
return 0;
}
}
};
static smp_file* to_smp_file(void* cookie) {
return static_cast<smp_file*>(cookie);
}
FILE*
smp_synchronize_lines(FILE* out) {
auto p = std::make_unique<smp_file>(out);
cookie_io_functions_t vtable = {};
vtable.write = [] (void* ck, const char* b, size_t s) { return to_smp_file(ck)->write(b, s); };
vtable.close = [] (void* ck) { delete to_smp_file(ck); return 0; };
auto ret = fopencookie(p.get(), "w", vtable);
if (!ret) {
return ret;
}
// disable buffering so that writes to ret don't get mixed
// up but are sent to smp_file immediately instead.
setvbuf(ret, nullptr, _IONBF, 0);
p.release();
return ret;
}

12
core/stdio.hh Normal file
View File

@@ -0,0 +1,12 @@
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include <stdio.h>
// Returns a FILE* that writes all its output to @out, but attempts
// not to mix lines if multiple threads write to it simultaenously.
FILE* smp_synchronize_lines(FILE* out);

View File

@@ -323,7 +323,21 @@ private:
void output() {
if (!_poll_active) {
_poll_active = true;
_tcp.poll_tcb(_foreign_ip, this->shared_from_this());
_tcp.poll_tcb(_foreign_ip, this->shared_from_this()).rescue([this] (auto get) {
try {
get();
} catch(arp_queue_full_error& ex) {
// retry later
_poll_active = false;
this->start_retransmit_timer();
} catch(arp_timeout_error& ex) {
if (this->in_state(SYN_SENT)) {
_connect_done.set_exception(ex);
this->cleanup();
}
// in other states connection should time out
}
});
}
}
future<> connect_done() {
@@ -561,7 +575,7 @@ public:
listener listen(uint16_t port, size_t queue_length = 100);
future<connection> connect(socket_address sa);
net::hw_features hw_features() { return _inet._inet.hw_features(); }
void poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb);
future<> poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb);
private:
void send_packet_without_tcb(ipaddr from, ipaddr to, packet p);
void respond_with_reset(tcp_hdr* rth, ipaddr local_ip, ipaddr foreign_ip);
@@ -596,9 +610,9 @@ tcp<InetTraits>::tcp(inet_type& inet) : _inet(inet), _e(_rd()) {
}
template <typename InetTraits>
void tcp<InetTraits>::poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb) {
_inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) {
_poll_tcbs.emplace_back(std::move(tcb), dst);
future<> tcp<InetTraits>::poll_tcb(ipaddr to, lw_shared_ptr<tcb> tcb) {
return _inet.get_l2_dst_address(to).then([this, tcb = std::move(tcb)] (ethernet_address dst) {
_poll_tcbs.emplace_back(std::move(tcb), dst);
});
}