Merge branch 'master' of github.com:cloudius-systems/seastar into db

Conflicts:
	configure.py
This commit is contained in:
Avi Kivity
2015-03-15 20:34:04 +02:00
9 changed files with 166 additions and 144 deletions

View File

@@ -167,7 +167,7 @@ apps = [
tests += urchin_tests
all_artifacts = apps + tests
all_artifacts = apps + tests + ['libseastar.a', 'seastar.pc']
arg_parser = argparse.ArgumentParser('Configure seastar')
arg_parser.add_argument('--static', dest = 'static', action = 'store_const', default = '',
@@ -307,6 +307,8 @@ urchin_core = (['database.cc',
+ core + libnet)
deps = {
'libseastar.a' : core + libnet,
'seastar.pc': [],
'seastar': ['main.cc'] + urchin_core,
'tests/test-reactor': ['tests/test-reactor.cc'] + core,
'apps/httpd/httpd': ['http/common.cc', 'http/routes.cc', 'json/json_elements.cc', 'json/formatter.cc', 'http/matcher.cc', 'http/mime_types.cc', 'http/httpd.cc', 'http/reply.cc', 'http/request_parser.rl', 'apps/httpd/main.cc'] + libnet + core,
@@ -420,6 +422,9 @@ with open(buildfile, 'w') as f:
rule ragel
command = ragel -G2 -o $out $in
description = RAGEL $out
rule gen
command = echo -e $text > $out
description = GEN $out
''').format(**globals()))
for mode in build_modes:
modeval = modes[mode]
@@ -438,6 +443,9 @@ with open(buildfile, 'w') as f:
command = $cxx $cxxflags_{mode} $ldflags -o $out $in $libs $libs_{mode}
description = LINK $out
pool = link_pool
rule ar.{mode}
command = rm -f $out; ar cr $out $in; ranlib $out
description = AR $out
rule thrift.{mode}
command = thrift -gen cpp:cob_style -out $builddir/{mode}/gen $in
description = THRIFT $in
@@ -463,9 +471,24 @@ with open(buildfile, 'w') as f:
objs += dep.objects('$builddir/' + mode + '/gen')
if isinstance(dep, Antlr3Grammar):
objs += dep.objects('$builddir/' + mode + '/gen')
f.write('build $builddir/{}/{}: link.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
if has_thrift:
f.write(' libs = -lthrift -lboost_system $libs\n')
if binary.endswith('.pc'):
vars = modeval.copy()
vars.update(globals())
pc = textwrap.dedent('''\
Name: Seastar
URL: http://seastar-project.org/
Description: Advanced C++ framework for high-performance server applications on modern hardware.
Version: 1.0
Libs: -L{srcdir}/{builddir} -Wl,--whole-archive -lseastar -Wl,--no-whole-archive {dbgflag} -Wl,--no-as-needed {static} {pie} -fvisibility=hidden -pthread {user_ldflags} {libs} {sanitize_libs}
Cflags: -std=gnu++1y {dbgflag} {fpie} -Wall -Werror -fvisibility=hidden -pthread -I{srcdir} -I{srcdir}/{builddir}/gen {user_cflags} {warnings} {defines} {sanitize} {opt}
''').format(builddir = 'build/' + mode, srcdir = os.getcwd(), **vars)
f.write('build $builddir/{}/{}: gen\n text = {}\n'.format(mode, binary, repr(pc)))
elif binary.endswith('.a'):
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
else:
f.write('build $builddir/{}/{}: link.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
if has_thrift:
f.write(' libs = -lthrift -lboost_system $libs\n')
for src in srcs:
if src.endswith('.cc'):
obj = '$builddir/' + mode + '/' + src.replace('.cc', '.o')

View File

@@ -152,7 +152,7 @@ public:
futurize_t<std::result_of_t<Func(Service&)>>
invoke_on(unsigned id, Func&& func) {
auto inst = _instances[id];
return smp::submit_to(id, [inst, func] {
return smp::submit_to(id, [inst, func = std::forward<Func>(func)] () mutable {
return func(*inst);
});
}

View File

@@ -256,15 +256,6 @@ struct future_state<> {
void forward_to(promise<>& pr) noexcept;
};
template <typename Func, typename... T>
struct future_task final : task, future_state<T...> {
Func _func;
future_task(Func&& func) : _func(std::move(func)) {}
virtual void run() noexcept {
func(*this);
}
};
template <typename... T>
class promise {
future<T...>* _future = nullptr;
@@ -324,7 +315,7 @@ private:
struct task_with_state final : task {
task_with_state(Func&& func) : _func(std::move(func)) {}
virtual void run() noexcept override {
_func(_state);
_func(std::move(_state));
}
future_state<T...> _state;
Func _func;
@@ -431,7 +422,7 @@ private:
struct task_with_ready_state final : task {
task_with_ready_state(Func&& func, future_state<T...>&& state) : _state(std::move(state)), _func(std::move(func)) {}
virtual void run() noexcept override {
_func(_state);
_func(std::move(_state));
}
future_state<T...> _state;
Func _func;
@@ -444,6 +435,32 @@ private:
_promise = nullptr;
}
}
template <typename Ret, typename Func, typename Param>
futurize_t<Ret> then(Func&& func, Param&& param) noexcept {
using futurator = futurize<Ret>;
using P = typename futurator::promise_type;
if (state()->available() && (++future_avail_count % 256)) {
try {
return futurator::apply(std::forward<Func>(func), param(std::move(*state())));
} catch (...) {
P p;
p.set_exception(std::current_exception());
return p.get_future();
}
}
P pr;
auto fut = pr.get_future();
schedule([pr = std::move(pr), func = std::forward<Func>(func), param = std::forward<Param>(param)] (auto&& state) mutable {
try {
futurator::apply(std::forward<Func>(func), param(std::move(state))).forward_to(std::move(pr));
} catch (...) {
pr.set_exception(std::current_exception());
}
});
return fut;
}
public:
using value_type = std::tuple<T...>;
using promise_type = promise<T...>;
@@ -484,26 +501,13 @@ public:
template <typename Func>
futurize_t<std::result_of_t<Func(T&&...)>> then(Func&& func) noexcept {
using futurator = futurize<std::result_of_t<Func(T&&...)>>;
if (state()->available() && (++future_avail_count % 256)) {
try {
return futurator::apply(std::forward<Func>(func), std::move(state()->get()));
} catch (...) {
typename futurator::promise_type p;
p.set_exception(std::current_exception());
return p.get_future();
}
}
typename futurator::promise_type pr;
auto fut = pr.get_future();
schedule([pr = std::move(pr), func = std::forward<Func>(func)] (auto& state) mutable {
try {
futurator::apply(std::forward<Func>(func), state.get()).forward_to(std::move(pr));
} catch (...) {
pr.set_exception(std::current_exception());
}
});
return fut;
return then<std::result_of_t<Func(T&&...)>>(std::forward<Func>(func), [] (future_state<T...>&& state) { return state.get(); });
}
template <typename Func>
futurize_t<std::result_of_t<Func(future<T...>)>>
then_wrapped(Func&& func) noexcept {
return then<std::result_of_t<Func(future<T...>)>>(std::forward<Func>(func), [] (future_state<T...>&& state) { return future(std::move(state)); });
}
void forward_to(promise<T...>&& pr) noexcept {
@@ -516,35 +520,6 @@ public:
}
}
template <typename Func>
futurize_t<std::result_of_t<Func(future<T...>)>>
then_wrapped(Func&& func) noexcept {
using futurator = futurize<std::result_of_t<Func(future<T...>)>>;
using P = typename futurator::promise_type;
if (state()->available()) {
try {
return futurator::apply(std::forward<Func>(func), std::move(*this));
} catch (...) {
P pr;
pr.set_exception(std::current_exception());
return pr.get_future();
}
}
P pr;
auto next_fut = pr.get_future();
_promise->schedule([func = std::forward<Func>(func), pr = std::move(pr)] (auto& state) mutable {
try {
futurator::apply(std::forward<Func>(func), future(std::move(state)))
.forward_to(std::move(pr));
} catch (...) {
pr.set_exception(std::current_exception());
}
});
_promise->_future = nullptr;
_promise = nullptr;
return next_fut;
}
/**
* Finally continuation for statements that require waiting for the result. I.e. you need to "finally" call
* a function that returns a possibly unavailable future.
@@ -552,9 +527,10 @@ public:
* is ignored. I.e. the original return value (the future upon which you are making this call) will be preserved.
*/
template <typename Func>
future<T...> finally(Func&& func, std::enable_if_t<is_future<std::result_of_t<Func()>>::value, void*> = nullptr) noexcept {
return then_wrapped([func = std::forward<Func>(func)](future<T...> result) {
return func().then_wrapped([result = std::move(result)](auto f_res) mutable {
future<T...> finally(Func&& func) noexcept {
return then_wrapped([func = std::forward<Func>(func)](future<T...> result) mutable {
using futurator = futurize<std::result_of_t<Func()>>;
return futurator::apply(std::forward<Func>(func)).then_wrapped([result = std::move(result)](auto f_res) mutable {
try {
f_res.get(); // force excepion if one
return std::move(result);
@@ -565,35 +541,7 @@ public:
});
}
template <typename Func>
future<T...> finally(Func&& func, std::enable_if_t<!is_future<std::result_of_t<Func()>>::value, void*> = nullptr) noexcept {
promise<T...> pr;
auto f = pr.get_future();
if (state()->available()) {
try {
func();
} catch (...) {
pr.set_exception(std::current_exception());
return f;
}
state()->forward_to(pr);
return f;
}
_promise->schedule([pr = std::move(pr), func = std::forward<Func>(func)] (auto& state) mutable {
try {
func();
} catch (...) {
pr.set_exception(std::current_exception());
return;
}
state.forward_to(pr);
});
_promise->_future = nullptr;
_promise = nullptr;
return f;
}
future<> or_terminate() && noexcept {
future<> or_terminate() noexcept {
return then_wrapped([] (auto&& f) {
try {
f.get();

View File

@@ -189,7 +189,7 @@ public:
return {};
}
throw_system_error_on(r == -1);
return { r };
return { ssize_t(r) };
}
boost::optional<size_t> recvmsg(msghdr* mh, int flags) {
auto r = ::recvmsg(_fd, mh, flags);

View File

@@ -1563,6 +1563,7 @@ reactor_backend_osv::enable_timer(clock_type::time_point when) {
void engine_exit(std::exception_ptr eptr) {
if (!eptr) {
engine().exit(0);
return;
}
#ifndef __GNUC__
std::cerr << "Exiting on unhandled exception.\n";

View File

@@ -433,17 +433,19 @@ class smp_message_queue {
virtual future<> process() = 0;
virtual void complete() = 0;
};
template <typename Func, typename Future>
template <typename Func>
struct async_work_item : work_item {
Func _func;
using value_type = typename Future::value_type;
using futurator = futurize<std::result_of_t<Func()>>;
using future_type = typename futurator::type;
using value_type = typename future_type::value_type;
std::experimental::optional<value_type> _result;
std::exception_ptr _ex; // if !_result
typename Future::promise_type _promise; // used on local side
typename futurator::promise_type _promise; // used on local side
async_work_item(Func&& func) : _func(std::move(func)) {}
virtual future<> process() override {
try {
return this->_func().then_wrapped([this] (auto&& f) {
return futurator::apply(this->_func).then_wrapped([this] (auto&& f) {
try {
_result = f.get();
} catch (...) {
@@ -463,7 +465,7 @@ class smp_message_queue {
_promise.set_exception(std::move(_ex));
}
}
Future get_future() { return _promise.get_future(); }
future_type get_future() { return _promise.get_future(); }
};
union tx_side {
tx_side() {}
@@ -477,9 +479,8 @@ class smp_message_queue {
public:
smp_message_queue();
template <typename Func>
std::result_of_t<Func()> submit(Func func) {
using future = std::result_of_t<Func()>;
auto wi = new async_work_item<Func, future>(std::move(func));
futurize_t<std::result_of_t<Func()>> submit(Func&& func) {
auto wi = new async_work_item<Func>(std::forward<Func>(func));
auto fut = wi->get_future();
submit_item(wi);
return fut;
@@ -881,29 +882,13 @@ public:
static bool main_thread() { return std::this_thread::get_id() == _tmain; }
template <typename Func>
static std::result_of_t<Func()> submit_to(unsigned t, Func func,
std::enable_if_t<returns_future<Func>::value, void*> = nullptr) {
static futurize_t<std::result_of_t<Func()>> submit_to(unsigned t, Func&& func) {
if (t == engine().cpu_id()) {
return func();
return futurize<std::result_of_t<Func()>>::apply(std::forward<Func>(func));
} else {
return _qs[t][engine().cpu_id()].submit(std::move(func));
return _qs[t][engine().cpu_id()].submit(std::forward<Func>(func));
}
}
template <typename Func>
static future<std::result_of_t<Func()>> submit_to(unsigned t, Func func,
std::enable_if_t<!returns_future<Func>::value && !returns_void<Func>::value, void*> = nullptr) {
return submit_to(t, [func = std::move(func)] () mutable {
return make_ready_future<std::result_of_t<Func()>>(func());
});
}
template <typename Func>
static future<> submit_to(unsigned t, Func func,
std::enable_if_t<!returns_future<Func>::value && returns_void<Func>::value, void*> = nullptr) {
return submit_to(t, [func = std::move(func)] () mutable {
func();
return make_ready_future<>();
});
}
static bool poll_queues() {
size_t got = 0;
for (unsigned i = 0; i < count; i++) {

66
core/unaligned.hh Normal file
View File

@@ -0,0 +1,66 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
// The following unaligned_cast<T*>(p) is a portable replacement for
// reinterpret_cast<T*>(p) which should be used every time address p
// is not guaranteed to be properly aligned to alignof(T).
//
// On architectures like x86 and ARM, where unaligned access is allowed,
// unaligned_cast will behave the same as reinterpret_cast and will generate
// the same code.
//
// Certain architectures (e.g., MIPS) make it extremely slow or outright
// forbidden to use ordinary machine instructions on a primitive type at an
// unaligned addresses - e.g., access a uint32_t at an address which is not
// a multiple of 4. Gcc's "undefined behavior sanitizer" (enabled in our debug
// build) also catches such unaligned accesses and reports them as errors,
// even when running on x86.
//
// Therefore, reinterpret_cast<int32_t*> on an address which is not guaranteed
// to be a multiple of 4 may generate extremely slow code or runtime errors,
// and must be avoided. The compiler needs to be told about the unaligned
// access, so it can generate reasonably-efficient code for the access
// (in MIPS, this means generating two instructions "lwl" and "lwr", instead
// of the one instruction "lw" which faults on unaligned/ access). The way to
// tell the compiler this is with __attribute__((packed)). This will also
// cause the sanitizer not to generate runtime alignment checks for this
// access.
template <typename T>
struct unaligned {
T raw;
unaligned() = default;
unaligned(T x) : raw(x) {}
unaligned& operator=(const T& x) { raw = x; return *this; }
operator T() const { return raw; }
} __attribute__((packed));
template <typename T, typename F>
inline auto unaligned_cast(F* p) {
return reinterpret_cast<unaligned<std::remove_pointer_t<T>>*>(p);
}
template <typename T, typename F>
inline auto unaligned_cast(const F* p) {
return reinterpret_cast<const unaligned<std::remove_pointer_t<T>>*>(p);
}

View File

@@ -26,6 +26,8 @@
#include <iosfwd>
#include <utility>
#include "core/unaligned.hh"
inline uint64_t ntohq(uint64_t v) {
return __builtin_bswap64(v);
}
@@ -56,25 +58,12 @@ inline int32_t hton(int32_t x) { return htonl(x); }
inline int64_t ntoh(int64_t x) { return ntohq(x); }
inline int64_t hton(int64_t x) { return htonq(x); }
// Wrapper around a primitive type to provide an unaligned version.
// This is because gcc (correctly) doesn't allow binding an unaligned
// scalar variable to a reference, and (unfortunately) doesn't allow
// specifying unaligned references.
//
// So, packed<uint32_t>& is our way of passing a reference (or pointer)
// to a uint32_t around, without losing the knowledge about its alignment
// or lack thereof.
// Define net::packed<> using unaligned<> from unaligned.hh.
template <typename T>
struct packed {
T raw;
packed() = default;
packed(T x) : raw(x) {}
packed& operator=(const T& x) { raw = x; return *this; }
operator T() const { return raw; }
struct packed : public unaligned<T> {
using unaligned<T>::unaligned; // inherit constructors
template <typename Adjuster>
void adjust_endianness(Adjuster a) { a(raw); }
void adjust_endianness(Adjuster a) { a(this->raw); }
} __attribute__((packed));
template <typename T>

View File

@@ -54,12 +54,22 @@ int main(int ac, char** av) {
});
app.run(ac, av, [&app] {
auto fname = app.configuration()["file"].as<std::string>();
engine().open_file_dma(fname, open_flags::ro | open_flags::create).then([] (file f) {
engine().open_file_dma(fname, open_flags::ro).then([] (file f) {
auto r = make_shared<reader>(std::move(f));
r->is.consume(*r).then([r] {
print("%d lines\n", r->count);
engine().exit(0);
});
}).then_wrapped([] (future<> f) {
try {
f.get();
engine().exit(0);
} catch (std::exception& ex) {
std::cout << ex.what() << "\n";
engine().exit(1);
} catch (...) {
std::cout << "unknown exception\n";
engine().exit(0);
}
});
});
}