Merge seastar upstream

This commit is contained in:
Avi Kivity
2015-06-14 17:47:32 +03:00
12 changed files with 347 additions and 50 deletions

View File

@@ -215,6 +215,7 @@ tests = [
'tests/fstream_test',
'tests/distributed_test',
'tests/rpc',
'tests/semaphore_test',
]
# urchin
@@ -483,6 +484,7 @@ deps = {
'tests/timertest': ['tests/timertest.cc'] + core,
'tests/futures_test': ['tests/futures_test.cc'] + core + boost_test_lib,
'tests/foreign_ptr_test': ['tests/foreign_ptr_test.cc'] + core + boost_test_lib,
'tests/semaphore_test': ['tests/semaphore_test.cc'] + core + boost_test_lib,
'tests/smp_test': ['tests/smp_test.cc'] + core,
'tests/thread_test': ['tests/thread_test.cc'] + core + boost_test_lib,
'tests/thread_context_switch': ['tests/thread_context_switch.cc'] + core,

View File

@@ -36,6 +36,9 @@
#include <fcntl.h>
#include <sys/eventfd.h>
#include <boost/thread/barrier.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/iterator/counting_iterator.hpp>
#include <atomic>
#include <dirent.h>
#ifdef HAVE_DPDK
@@ -45,6 +48,7 @@
#endif
#include "prefetch.hh"
#include <exception>
#include <regex>
#ifdef __GNUC__
#include <iostream>
#include <system_error>
@@ -1466,14 +1470,59 @@ reactor::get_options_description() {
return opts;
}
// We need a wrapper class, because boost::program_options wants validate()
// (below) to be in the same namespace as the type it is validating.
struct cpuset_wrapper {
resource::cpuset value;
};
// Overload for boost program options parsing/validation
void validate(boost::any& v,
const std::vector<std::string>& values,
cpuset_wrapper* target_type, int) {
using namespace boost::program_options;
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
validators::check_first_occurrence(v);
// Extract the first string from 'values'. If there is more than
// one string, it's an error, and exception will be thrown.
auto&& s = validators::get_single_string(values);
std::smatch match;
if (std::regex_match(s, match, r)) {
std::vector<std::string> ranges;
boost::split(ranges, s, boost::is_any_of(","));
cpuset_wrapper ret;
for (auto&& range: ranges) {
std::string beg = range;
std::string end = range;
auto dash = range.find('-');
if (dash != range.npos) {
beg = range.substr(0, dash);
end = range.substr(dash + 1);
}
auto b = boost::lexical_cast<unsigned>(beg);
auto e = boost::lexical_cast<unsigned>(end);
if (b > e) {
throw validation_error(validation_error::invalid_option_value);
}
for (auto i = b; i <= e; ++i) {
std::cout << "adding " << i << "\n";
ret.value.insert(i);
}
}
v = std::move(ret);
} else {
throw validation_error(validation_error::invalid_option_value);
}
}
boost::program_options::options_description
smp::get_options_description()
{
namespace bpo = boost::program_options;
bpo::options_description opts("SMP options");
auto cpus = resource::nr_processing_units();
opts.add_options()
("smp,c", bpo::value<unsigned>()->default_value(cpus), "number of threads")
("smp,c", bpo::value<unsigned>(), "number of threads (default: one per CPU)")
("cpuset", bpo::value<cpuset_wrapper>(), "CPUs to use (in cpuset(7) format; default: all))")
("memory,m", bpo::value<std::string>(), "memory to use, in bytes (ex: 4G) (default: all)")
("reserve-memory", bpo::value<std::string>()->default_value("512M"), "memory reserved to OS")
("hugepages", bpo::value<std::string>(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)")
@@ -1544,7 +1593,19 @@ void smp::configure(boost::program_options::variables_map configuration)
{
smp::count = 1;
smp::_tmain = std::this_thread::get_id();
smp::count = configuration["smp"].as<unsigned>();
auto nr_cpus = resource::nr_processing_units();
resource::cpuset cpu_set;
std::copy(boost::counting_iterator<unsigned>(0), boost::counting_iterator<unsigned>(nr_cpus),
std::inserter(cpu_set, cpu_set.end()));
if (configuration.count("cpuset")) {
cpu_set = configuration["cpuset"].as<cpuset_wrapper>().value;
}
if (configuration.count("smp")) {
nr_cpus = configuration["smp"].as<unsigned>();
} else {
nr_cpus = cpu_set.size();
}
smp::count = nr_cpus;
resource::configuration rc;
if (configuration.count("memory")) {
rc.total_memory = parse_memory_size(configuration["memory"].as<std::string>());
@@ -1577,6 +1638,7 @@ void smp::configure(boost::program_options::variables_map configuration)
hugepages_path = configuration["hugepages"].as<std::string>();
}
rc.cpus = smp::count;
rc.cpu_set = std::move(cpu_set);
std::vector<resource::cpu> allocations = resource::allocate(rc);
smp::pin(allocations[0].cpu_id);
memory::configure(allocations[0].mem, hugepages_path);

View File

@@ -49,12 +49,12 @@
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/program_options.hpp>
#include <set>
#include "util/eclipse.hh"
#include "future.hh"
#include "posix.hh"
#include "apply.hh"
#include "sstring.hh"
#include "timer-set.hh"
#include "deleter.hh"
#include "net/api.hh"
#include "temporary_buffer.hh"
@@ -64,6 +64,7 @@
#include "core/scattered_message.hh"
#include "core/enum.hh"
#include <boost/range/irange.hpp>
#include "timer.hh"
#ifdef HAVE_OSV
#include <osv/sched.hh>
@@ -77,7 +78,6 @@ namespace scollectd { class registration; }
class reactor;
class pollable_fd;
class pollable_fd_state;
class lowres_clock;
struct free_deleter {
void operator()(void* p) { ::free(p); }
@@ -93,42 +93,6 @@ std::unique_ptr<CharType[], free_deleter> allocate_aligned_buffer(size_t size, s
return std::unique_ptr<CharType[], free_deleter>(reinterpret_cast<CharType*>(ret));
}
using clock_type = std::chrono::high_resolution_clock;
template <typename Clock = std::chrono::high_resolution_clock>
class timer {
public:
typedef typename Clock::time_point time_point;
typedef typename Clock::duration duration;
typedef Clock clock;
private:
using callback_t = std::function<void()>;
boost::intrusive::list_member_hook<> _link;
callback_t _callback;
time_point _expiry;
std::experimental::optional<duration> _period;
bool _armed = false;
bool _queued = false;
bool _expired = false;
void readd_periodic();
void arm_state(time_point until, std::experimental::optional<duration> period);
public:
timer() = default;
explicit timer(callback_t&& callback);
~timer();
future<> expired();
void set_callback(callback_t&& callback);
void arm(time_point until, std::experimental::optional<duration> period = {});
void rearm(time_point until, std::experimental::optional<duration> period = {});
void arm(duration delta);
void arm_periodic(duration delta);
bool armed() const { return _armed; }
bool cancel();
time_point get_timeout();
friend class reactor;
friend class seastar::timer_set<timer, &timer::_link>;
};
class lowres_clock {
public:
typedef int64_t rep;

View File

@@ -42,8 +42,8 @@ size_t div_roundup(size_t num, size_t denom) {
}
static unsigned find_memory_depth(hwloc_topology_t& topology) {
auto obj = hwloc_get_pu_obj_by_os_index(topology, 0);
auto depth = hwloc_get_type_depth(topology, HWLOC_OBJ_PU);
auto obj = hwloc_get_next_obj_by_depth(topology, depth, nullptr);
while (!obj->memory.local_memory && obj) {
obj = hwloc_get_ancestor_obj_by_depth(topology, --depth, obj);
@@ -68,6 +68,26 @@ std::vector<cpu> allocate(configuration c) {
hwloc_topology_init(&topology);
auto free_hwloc = defer([&] { hwloc_topology_destroy(topology); });
hwloc_topology_load(topology);
if (c.cpu_set) {
auto bm = hwloc_bitmap_alloc();
auto free_bm = defer([&] { hwloc_bitmap_free(bm); });
for (auto idx : *c.cpu_set) {
hwloc_bitmap_set(bm, idx);
}
auto r = hwloc_topology_restrict(topology, bm,
HWLOC_RESTRICT_FLAG_ADAPT_DISTANCES
| HWLOC_RESTRICT_FLAG_ADAPT_MISC
| HWLOC_RESTRICT_FLAG_ADAPT_IO);
if (r == -1) {
if (errno == ENOMEM) {
throw std::bad_alloc();
}
if (errno == EINVAL) {
throw std::runtime_error("bad cpuset");
}
abort();
}
}
auto machine_depth = hwloc_get_type_depth(topology, HWLOC_OBJ_MACHINE);
assert(hwloc_get_nbobjs_by_depth(topology, machine_depth) == 1);
auto machine = hwloc_get_obj_by_depth(topology, machine_depth, 0);
@@ -152,7 +172,8 @@ std::vector<cpu> allocate(configuration c) {
if (mem > available_memory) {
throw std::runtime_error("insufficient physical memory");
}
auto procs = c.cpus.value_or(nr_processing_units());
auto cpuset_procs = c.cpu_set ? c.cpu_set->size() : nr_processing_units();
auto procs = c.cpus.value_or(cpuset_procs);
std::vector<cpu> ret;
ret.reserve(procs);
for (unsigned i = 0; i < procs; ++i) {

View File

@@ -25,6 +25,7 @@
#include <cstdlib>
#include <experimental/optional>
#include <vector>
#include <set>
cpu_set_t cpuid_to_cpuset(unsigned cpuid);
@@ -32,10 +33,13 @@ namespace resource {
using std::experimental::optional;
using cpuset = std::set<unsigned>;
struct configuration {
optional<size_t> total_memory;
optional<size_t> reserve_memory; // if total_memory not specified
optional<size_t> cpus;
optional<cpuset> cpu_set;
};
struct memory {

View File

@@ -33,6 +33,7 @@
/// - \ref future-util Utililty functions for working with futures
/// - \ref memory-module Memory management
/// - \ref networking-module TCP/IP networking
/// - \ref thread-module Support for traditional threaded execution
#include "sstring.hh"
#include "future.hh"
@@ -62,7 +63,7 @@ enum class open_flags;
/// The native stack supports zero-copy on both transmit
/// and receive, and is implemented using seastar's high
/// performance, lockless sharded design. The network stack
/// can be selected with the \c --network-stack command-line
/// can be selected with the \c \--network-stack command-line
/// parameter.
/// \addtogroup networking-module

View File

@@ -25,6 +25,7 @@
#include "future.hh"
#include "circular_buffer.hh"
#include <stdexcept>
#include "timer.hh"
class broken_semaphore : public std::exception {
public:
@@ -33,10 +34,23 @@ public:
}
};
class semaphore_timed_out : public std::exception {
public:
virtual const char* what() const noexcept {
return "Semaphore timedout";
}
};
class semaphore {
private:
size_t _count;
circular_buffer<std::pair<promise<>, size_t>> _wait_list;
struct entry {
promise<> pr;
size_t nr;
timer<> tr;
entry(promise<>&& pr_, size_t nr_) : pr(std::move(pr_)), nr(nr_) {}
};
circular_buffer<entry> _wait_list;
public:
semaphore(size_t count = 1) : _count(count) {}
future<> wait(size_t nr = 1) {
@@ -46,15 +60,31 @@ public:
}
promise<> pr;
auto fut = pr.get_future();
_wait_list.push_back({ std::move(pr), nr });
_wait_list.push_back(entry(std::move(pr), nr));
return fut;
}
future<> wait(typename timer<>::duration timeout, size_t nr = 1) {
auto fut = wait(nr);
if (!fut.available()) {
auto& e = _wait_list.back();
e.tr.set_callback([&e, this] {
e.pr.set_exception(semaphore_timed_out());
e.nr = 0;
signal(0);
});
e.tr.arm(timeout);
}
return std::move(fut);
}
void signal(size_t nr = 1) {
_count += nr;
while (!_wait_list.empty() && _wait_list.front().second <= _count) {
while (!_wait_list.empty() && _wait_list.front().nr <= _count) {
auto& x = _wait_list.front();
_count -= x.second;
x.first.set_value();
if (x.nr) {
_count -= x.nr;
x.pr.set_value();
x.tr.cancel();
}
_wait_list.pop_front();
}
}
@@ -89,7 +119,8 @@ void semaphore::broken(const Exception& ex) {
auto xp = std::make_exception_ptr(ex);
while (!_wait_list.empty()) {
auto& x = _wait_list.front();
x.first.set_exception(xp);
x.pr.set_exception(xp);
x.tr.cancel();
_wait_list.pop_front();
}
}

View File

@@ -24,6 +24,8 @@
#include "posix.hh"
#include <ucontext.h>
/// \cond internal
namespace seastar {
thread_local thread_context* g_current_thread;
@@ -118,3 +120,5 @@ void init() {
}
/// \endcond

View File

@@ -29,10 +29,45 @@
#include <setjmp.h>
#include <type_traits>
/// \defgroup thread-module Seastar threads
///
/// Seastar threads provide an execution environment where blocking
/// is tolerated; you can issue I/O, and wait for it in the same function,
/// rather then establishing a callback to be called with \ref future<>::then().
///
/// Seastar threads are not the same as operating system threads:
/// - seastar threads are cooperative; they are never preempted except
/// at blocking points (see below)
/// - seastar threads always run on the same core they were launched on
///
/// Like other seastar code, seastar threads may not issue blocking system calls.
///
/// A seastar thread blocking point is any function that returns a \ref future<>.
/// you block by calling \ref future<>::get(); this waits for the future to become
/// available, and in the meanwhile, other seastar threads and seastar non-threaded
/// code may execute.
///
/// Example:
/// \code
/// seastar::thread th([] {
/// sleep(5s).get(); // blocking point
/// });
/// \endcode
///
/// An easy way to launch a thread and carry out some computation, and return a
/// result from this execution is by using the \ref seastar::async() function.
/// The result is returned as a future, so that non-threaded code can wait for
/// the thread to terminate and yield a result.
/// Seastar API namespace
namespace seastar {
/// \addtogroup thread-module
/// @{
class thread;
/// \cond internal
class thread_context;
namespace thread_impl {
@@ -69,6 +104,9 @@ public:
friend void thread_impl::switch_out(thread_context*);
};
/// \endcond
/// \brief thread - stateful thread of execution
///
/// Threads allow using seastar APIs in a blocking manner,
@@ -125,6 +163,17 @@ thread::join() {
/// \param func a callable to be executed in a thread
/// \param args a parameter pack to be forwarded to \c func.
/// \return whatever \c func returns, as a future.
///
/// Example:
/// \code
/// future<int> compute_sum(int a, int b) {
/// return seastar::async([a, b] {
/// // some blocking code:
/// sleep(1s).get();
/// return a + b;
/// });
/// }
/// \endcode
template <typename Func, typename... Args>
inline
futurize_t<std::result_of_t<std::decay_t<Func>(std::decay_t<Args>...)>>
@@ -147,4 +196,6 @@ async(Func&& func, Args&&... args) {
});
}
/// @}
}

70
core/timer.hh Normal file
View File

@@ -0,0 +1,70 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include <experimental/optional>
#include <atomic>
#include "future.hh"
#include "timer-set.hh"
using clock_type = std::chrono::high_resolution_clock;
template <typename Clock = std::chrono::high_resolution_clock>
class timer {
public:
typedef typename Clock::time_point time_point;
typedef typename Clock::duration duration;
typedef Clock clock;
private:
using callback_t = std::function<void()>;
boost::intrusive::list_member_hook<> _link;
callback_t _callback;
time_point _expiry;
std::experimental::optional<duration> _period;
bool _armed = false;
bool _queued = false;
bool _expired = false;
void readd_periodic();
void arm_state(time_point until, std::experimental::optional<duration> period);
public:
timer() = default;
timer(timer&& t) noexcept : _callback(std::move(t._callback)), _expiry(std::move(t._expiry)), _period(std::move(t._period)),
_armed(t._armed), _queued(t._queued), _expired(t._expired) {
_link.swap_nodes(t._link);
t._queued = false;
t._armed = false;
}
explicit timer(callback_t&& callback);
~timer();
future<> expired();
void set_callback(callback_t&& callback);
void arm(time_point until, std::experimental::optional<duration> period = {});
void rearm(time_point until, std::experimental::optional<duration> period = {});
void arm(duration delta);
void arm_periodic(duration delta);
bool armed() const { return _armed; }
bool cancel();
time_point get_timeout();
friend class reactor;
friend class seastar::timer_set<timer, &timer::_link>;
};

86
tests/semaphore_test.cc Normal file
View File

@@ -0,0 +1,86 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#include "core/thread.hh"
#include "core/do_with.hh"
#include "test-utils.hh"
#include "core/sstring.hh"
#include "core/reactor.hh"
#include "core/semaphore.hh"
#include "core/do_with.hh"
#include "core/future-util.hh"
#include "core/sleep.hh"
using namespace seastar;
using namespace std::chrono_literals;
SEASTAR_TEST_CASE(test_semaphore_1) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
x.first.wait().then([&x] {
x.second++;
});
x.first.signal();
return sleep(10ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 1);
});
});
}
SEASTAR_TEST_CASE(test_semaphore_2) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
x.first.wait().then([&x] {
x.second++;
});
return sleep(10ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 0);
});
});
}
SEASTAR_TEST_CASE(test_semaphore_timeout_1) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
x.first.wait(10ms).then([&x] {
x.second++;
});
sleep(3ms).then([&x] {
x.first.signal();
});
return sleep(20ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 1);
});
});
}
SEASTAR_TEST_CASE(test_semaphore_timeout_2) {
return do_with(std::make_pair(semaphore(0), 0), [] (std::pair<semaphore, int>& x) {
x.first.wait(3ms).then([&x] {
x.second++;
});
sleep(10ms).then([&x] {
x.first.signal();
});
return sleep(20ms).then([&x] {
BOOST_REQUIRE_EQUAL(x.second, 0);
});
});
}

View File

@@ -24,6 +24,7 @@
#include "core/do_with.hh"
#include "test-utils.hh"
#include "core/sstring.hh"
#include "core/reactor.hh"
#include "core/semaphore.hh"
#include "core/do_with.hh"
#include "core/future-util.hh"