diff --git a/configure.py b/configure.py index 1427ee3191..1f1d0e4995 100755 --- a/configure.py +++ b/configure.py @@ -215,6 +215,7 @@ tests = [ 'tests/fstream_test', 'tests/distributed_test', 'tests/rpc', + 'tests/semaphore_test', ] # urchin @@ -483,6 +484,7 @@ deps = { 'tests/timertest': ['tests/timertest.cc'] + core, 'tests/futures_test': ['tests/futures_test.cc'] + core + boost_test_lib, 'tests/foreign_ptr_test': ['tests/foreign_ptr_test.cc'] + core + boost_test_lib, + 'tests/semaphore_test': ['tests/semaphore_test.cc'] + core + boost_test_lib, 'tests/smp_test': ['tests/smp_test.cc'] + core, 'tests/thread_test': ['tests/thread_test.cc'] + core + boost_test_lib, 'tests/thread_context_switch': ['tests/thread_context_switch.cc'] + core, diff --git a/core/reactor.cc b/core/reactor.cc index d89d265d30..4d0b084f67 100644 --- a/core/reactor.cc +++ b/core/reactor.cc @@ -36,6 +36,9 @@ #include #include #include +#include +#include +#include #include #include #ifdef HAVE_DPDK @@ -45,6 +48,7 @@ #endif #include "prefetch.hh" #include +#include #ifdef __GNUC__ #include #include @@ -1466,14 +1470,59 @@ reactor::get_options_description() { return opts; } +// We need a wrapper class, because boost::program_options wants validate() +// (below) to be in the same namespace as the type it is validating. +struct cpuset_wrapper { + resource::cpuset value; +}; + +// Overload for boost program options parsing/validation +void validate(boost::any& v, + const std::vector& values, + cpuset_wrapper* target_type, int) { + using namespace boost::program_options; + static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*"); + validators::check_first_occurrence(v); + // Extract the first string from 'values'. If there is more than + // one string, it's an error, and exception will be thrown. + auto&& s = validators::get_single_string(values); + std::smatch match; + if (std::regex_match(s, match, r)) { + std::vector ranges; + boost::split(ranges, s, boost::is_any_of(",")); + cpuset_wrapper ret; + for (auto&& range: ranges) { + std::string beg = range; + std::string end = range; + auto dash = range.find('-'); + if (dash != range.npos) { + beg = range.substr(0, dash); + end = range.substr(dash + 1); + } + auto b = boost::lexical_cast(beg); + auto e = boost::lexical_cast(end); + if (b > e) { + throw validation_error(validation_error::invalid_option_value); + } + for (auto i = b; i <= e; ++i) { + std::cout << "adding " << i << "\n"; + ret.value.insert(i); + } + } + v = std::move(ret); + } else { + throw validation_error(validation_error::invalid_option_value); + } +} + boost::program_options::options_description smp::get_options_description() { namespace bpo = boost::program_options; bpo::options_description opts("SMP options"); - auto cpus = resource::nr_processing_units(); opts.add_options() - ("smp,c", bpo::value()->default_value(cpus), "number of threads") + ("smp,c", bpo::value(), "number of threads (default: one per CPU)") + ("cpuset", bpo::value(), "CPUs to use (in cpuset(7) format; default: all))") ("memory,m", bpo::value(), "memory to use, in bytes (ex: 4G) (default: all)") ("reserve-memory", bpo::value()->default_value("512M"), "memory reserved to OS") ("hugepages", bpo::value(), "path to accessible hugetlbfs mount (typically /dev/hugepages/something)") @@ -1544,7 +1593,19 @@ void smp::configure(boost::program_options::variables_map configuration) { smp::count = 1; smp::_tmain = std::this_thread::get_id(); - smp::count = configuration["smp"].as(); + auto nr_cpus = resource::nr_processing_units(); + resource::cpuset cpu_set; + std::copy(boost::counting_iterator(0), boost::counting_iterator(nr_cpus), + std::inserter(cpu_set, cpu_set.end())); + if (configuration.count("cpuset")) { + cpu_set = configuration["cpuset"].as().value; + } + if (configuration.count("smp")) { + nr_cpus = configuration["smp"].as(); + } else { + nr_cpus = cpu_set.size(); + } + smp::count = nr_cpus; resource::configuration rc; if (configuration.count("memory")) { rc.total_memory = parse_memory_size(configuration["memory"].as()); @@ -1577,6 +1638,7 @@ void smp::configure(boost::program_options::variables_map configuration) hugepages_path = configuration["hugepages"].as(); } rc.cpus = smp::count; + rc.cpu_set = std::move(cpu_set); std::vector allocations = resource::allocate(rc); smp::pin(allocations[0].cpu_id); memory::configure(allocations[0].mem, hugepages_path); diff --git a/core/reactor.hh b/core/reactor.hh index 7da5fb2bee..0a76a2da9c 100644 --- a/core/reactor.hh +++ b/core/reactor.hh @@ -49,12 +49,12 @@ #include #include #include +#include #include "util/eclipse.hh" #include "future.hh" #include "posix.hh" #include "apply.hh" #include "sstring.hh" -#include "timer-set.hh" #include "deleter.hh" #include "net/api.hh" #include "temporary_buffer.hh" @@ -64,6 +64,7 @@ #include "core/scattered_message.hh" #include "core/enum.hh" #include +#include "timer.hh" #ifdef HAVE_OSV #include @@ -77,7 +78,6 @@ namespace scollectd { class registration; } class reactor; class pollable_fd; class pollable_fd_state; -class lowres_clock; struct free_deleter { void operator()(void* p) { ::free(p); } @@ -93,42 +93,6 @@ std::unique_ptr allocate_aligned_buffer(size_t size, s return std::unique_ptr(reinterpret_cast(ret)); } -using clock_type = std::chrono::high_resolution_clock; - -template -class timer { -public: - typedef typename Clock::time_point time_point; - typedef typename Clock::duration duration; - typedef Clock clock; -private: - using callback_t = std::function; - boost::intrusive::list_member_hook<> _link; - callback_t _callback; - time_point _expiry; - std::experimental::optional _period; - bool _armed = false; - bool _queued = false; - bool _expired = false; - void readd_periodic(); - void arm_state(time_point until, std::experimental::optional period); -public: - timer() = default; - explicit timer(callback_t&& callback); - ~timer(); - future<> expired(); - void set_callback(callback_t&& callback); - void arm(time_point until, std::experimental::optional period = {}); - void rearm(time_point until, std::experimental::optional period = {}); - void arm(duration delta); - void arm_periodic(duration delta); - bool armed() const { return _armed; } - bool cancel(); - time_point get_timeout(); - friend class reactor; - friend class seastar::timer_set; -}; - class lowres_clock { public: typedef int64_t rep; diff --git a/core/resource.cc b/core/resource.cc index 337465243f..f7ce6cda32 100644 --- a/core/resource.cc +++ b/core/resource.cc @@ -42,8 +42,8 @@ size_t div_roundup(size_t num, size_t denom) { } static unsigned find_memory_depth(hwloc_topology_t& topology) { - auto obj = hwloc_get_pu_obj_by_os_index(topology, 0); auto depth = hwloc_get_type_depth(topology, HWLOC_OBJ_PU); + auto obj = hwloc_get_next_obj_by_depth(topology, depth, nullptr); while (!obj->memory.local_memory && obj) { obj = hwloc_get_ancestor_obj_by_depth(topology, --depth, obj); @@ -68,6 +68,26 @@ std::vector allocate(configuration c) { hwloc_topology_init(&topology); auto free_hwloc = defer([&] { hwloc_topology_destroy(topology); }); hwloc_topology_load(topology); + if (c.cpu_set) { + auto bm = hwloc_bitmap_alloc(); + auto free_bm = defer([&] { hwloc_bitmap_free(bm); }); + for (auto idx : *c.cpu_set) { + hwloc_bitmap_set(bm, idx); + } + auto r = hwloc_topology_restrict(topology, bm, + HWLOC_RESTRICT_FLAG_ADAPT_DISTANCES + | HWLOC_RESTRICT_FLAG_ADAPT_MISC + | HWLOC_RESTRICT_FLAG_ADAPT_IO); + if (r == -1) { + if (errno == ENOMEM) { + throw std::bad_alloc(); + } + if (errno == EINVAL) { + throw std::runtime_error("bad cpuset"); + } + abort(); + } + } auto machine_depth = hwloc_get_type_depth(topology, HWLOC_OBJ_MACHINE); assert(hwloc_get_nbobjs_by_depth(topology, machine_depth) == 1); auto machine = hwloc_get_obj_by_depth(topology, machine_depth, 0); @@ -152,7 +172,8 @@ std::vector allocate(configuration c) { if (mem > available_memory) { throw std::runtime_error("insufficient physical memory"); } - auto procs = c.cpus.value_or(nr_processing_units()); + auto cpuset_procs = c.cpu_set ? c.cpu_set->size() : nr_processing_units(); + auto procs = c.cpus.value_or(cpuset_procs); std::vector ret; ret.reserve(procs); for (unsigned i = 0; i < procs; ++i) { diff --git a/core/resource.hh b/core/resource.hh index b53f40ad63..d8f4ebc04a 100644 --- a/core/resource.hh +++ b/core/resource.hh @@ -25,6 +25,7 @@ #include #include #include +#include cpu_set_t cpuid_to_cpuset(unsigned cpuid); @@ -32,10 +33,13 @@ namespace resource { using std::experimental::optional; +using cpuset = std::set; + struct configuration { optional total_memory; optional reserve_memory; // if total_memory not specified optional cpus; + optional cpu_set; }; struct memory { diff --git a/core/seastar.hh b/core/seastar.hh index 396b51ac7c..3d82a6366c 100644 --- a/core/seastar.hh +++ b/core/seastar.hh @@ -33,6 +33,7 @@ /// - \ref future-util Utililty functions for working with futures /// - \ref memory-module Memory management /// - \ref networking-module TCP/IP networking +/// - \ref thread-module Support for traditional threaded execution #include "sstring.hh" #include "future.hh" @@ -62,7 +63,7 @@ enum class open_flags; /// The native stack supports zero-copy on both transmit /// and receive, and is implemented using seastar's high /// performance, lockless sharded design. The network stack -/// can be selected with the \c --network-stack command-line +/// can be selected with the \c \--network-stack command-line /// parameter. /// \addtogroup networking-module diff --git a/core/semaphore.hh b/core/semaphore.hh index f54d941a04..a4c7eaf794 100644 --- a/core/semaphore.hh +++ b/core/semaphore.hh @@ -25,6 +25,7 @@ #include "future.hh" #include "circular_buffer.hh" #include +#include "timer.hh" class broken_semaphore : public std::exception { public: @@ -33,10 +34,23 @@ public: } }; +class semaphore_timed_out : public std::exception { +public: + virtual const char* what() const noexcept { + return "Semaphore timedout"; + } +}; + class semaphore { private: size_t _count; - circular_buffer, size_t>> _wait_list; + struct entry { + promise<> pr; + size_t nr; + timer<> tr; + entry(promise<>&& pr_, size_t nr_) : pr(std::move(pr_)), nr(nr_) {} + }; + circular_buffer _wait_list; public: semaphore(size_t count = 1) : _count(count) {} future<> wait(size_t nr = 1) { @@ -46,15 +60,31 @@ public: } promise<> pr; auto fut = pr.get_future(); - _wait_list.push_back({ std::move(pr), nr }); + _wait_list.push_back(entry(std::move(pr), nr)); return fut; } + future<> wait(typename timer<>::duration timeout, size_t nr = 1) { + auto fut = wait(nr); + if (!fut.available()) { + auto& e = _wait_list.back(); + e.tr.set_callback([&e, this] { + e.pr.set_exception(semaphore_timed_out()); + e.nr = 0; + signal(0); + }); + e.tr.arm(timeout); + } + return std::move(fut); + } void signal(size_t nr = 1) { _count += nr; - while (!_wait_list.empty() && _wait_list.front().second <= _count) { + while (!_wait_list.empty() && _wait_list.front().nr <= _count) { auto& x = _wait_list.front(); - _count -= x.second; - x.first.set_value(); + if (x.nr) { + _count -= x.nr; + x.pr.set_value(); + x.tr.cancel(); + } _wait_list.pop_front(); } } @@ -89,7 +119,8 @@ void semaphore::broken(const Exception& ex) { auto xp = std::make_exception_ptr(ex); while (!_wait_list.empty()) { auto& x = _wait_list.front(); - x.first.set_exception(xp); + x.pr.set_exception(xp); + x.tr.cancel(); _wait_list.pop_front(); } } diff --git a/core/thread.cc b/core/thread.cc index a466201067..ac769e4dc3 100644 --- a/core/thread.cc +++ b/core/thread.cc @@ -24,6 +24,8 @@ #include "posix.hh" #include +/// \cond internal + namespace seastar { thread_local thread_context* g_current_thread; @@ -118,3 +120,5 @@ void init() { } + +/// \endcond diff --git a/core/thread.hh b/core/thread.hh index 8d43c313e1..7011b5a4d0 100644 --- a/core/thread.hh +++ b/core/thread.hh @@ -29,10 +29,45 @@ #include #include +/// \defgroup thread-module Seastar threads +/// +/// Seastar threads provide an execution environment where blocking +/// is tolerated; you can issue I/O, and wait for it in the same function, +/// rather then establishing a callback to be called with \ref future<>::then(). +/// +/// Seastar threads are not the same as operating system threads: +/// - seastar threads are cooperative; they are never preempted except +/// at blocking points (see below) +/// - seastar threads always run on the same core they were launched on +/// +/// Like other seastar code, seastar threads may not issue blocking system calls. +/// +/// A seastar thread blocking point is any function that returns a \ref future<>. +/// you block by calling \ref future<>::get(); this waits for the future to become +/// available, and in the meanwhile, other seastar threads and seastar non-threaded +/// code may execute. +/// +/// Example: +/// \code +/// seastar::thread th([] { +/// sleep(5s).get(); // blocking point +/// }); +/// \endcode +/// +/// An easy way to launch a thread and carry out some computation, and return a +/// result from this execution is by using the \ref seastar::async() function. +/// The result is returned as a future, so that non-threaded code can wait for +/// the thread to terminate and yield a result. + /// Seastar API namespace namespace seastar { +/// \addtogroup thread-module +/// @{ + class thread; + +/// \cond internal class thread_context; namespace thread_impl { @@ -69,6 +104,9 @@ public: friend void thread_impl::switch_out(thread_context*); }; +/// \endcond + + /// \brief thread - stateful thread of execution /// /// Threads allow using seastar APIs in a blocking manner, @@ -125,6 +163,17 @@ thread::join() { /// \param func a callable to be executed in a thread /// \param args a parameter pack to be forwarded to \c func. /// \return whatever \c func returns, as a future. +/// +/// Example: +/// \code +/// future compute_sum(int a, int b) { +/// return seastar::async([a, b] { +/// // some blocking code: +/// sleep(1s).get(); +/// return a + b; +/// }); +/// } +/// \endcode template inline futurize_t(std::decay_t...)>> @@ -147,4 +196,6 @@ async(Func&& func, Args&&... args) { }); } +/// @} + } diff --git a/core/timer.hh b/core/timer.hh new file mode 100644 index 0000000000..318266e78f --- /dev/null +++ b/core/timer.hh @@ -0,0 +1,70 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright 2015 Cloudius Systems + */ + +#pragma once + +#include +#include +#include "future.hh" +#include "timer-set.hh" + +using clock_type = std::chrono::high_resolution_clock; + +template +class timer { +public: + typedef typename Clock::time_point time_point; + typedef typename Clock::duration duration; + typedef Clock clock; +private: + using callback_t = std::function; + boost::intrusive::list_member_hook<> _link; + callback_t _callback; + time_point _expiry; + std::experimental::optional _period; + bool _armed = false; + bool _queued = false; + bool _expired = false; + void readd_periodic(); + void arm_state(time_point until, std::experimental::optional period); +public: + timer() = default; + timer(timer&& t) noexcept : _callback(std::move(t._callback)), _expiry(std::move(t._expiry)), _period(std::move(t._period)), + _armed(t._armed), _queued(t._queued), _expired(t._expired) { + _link.swap_nodes(t._link); + t._queued = false; + t._armed = false; + } + explicit timer(callback_t&& callback); + ~timer(); + future<> expired(); + void set_callback(callback_t&& callback); + void arm(time_point until, std::experimental::optional period = {}); + void rearm(time_point until, std::experimental::optional period = {}); + void arm(duration delta); + void arm_periodic(duration delta); + bool armed() const { return _armed; } + bool cancel(); + time_point get_timeout(); + friend class reactor; + friend class seastar::timer_set; +}; + diff --git a/tests/semaphore_test.cc b/tests/semaphore_test.cc new file mode 100644 index 0000000000..3d002fc860 --- /dev/null +++ b/tests/semaphore_test.cc @@ -0,0 +1,86 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Copyright (C) 2015 Cloudius Systems, Ltd. + */ + +#include "core/thread.hh" +#include "core/do_with.hh" +#include "test-utils.hh" +#include "core/sstring.hh" +#include "core/reactor.hh" +#include "core/semaphore.hh" +#include "core/do_with.hh" +#include "core/future-util.hh" +#include "core/sleep.hh" + +using namespace seastar; +using namespace std::chrono_literals; + +SEASTAR_TEST_CASE(test_semaphore_1) { + return do_with(std::make_pair(semaphore(0), 0), [] (std::pair& x) { + x.first.wait().then([&x] { + x.second++; + }); + x.first.signal(); + return sleep(10ms).then([&x] { + BOOST_REQUIRE_EQUAL(x.second, 1); + }); + }); +} + +SEASTAR_TEST_CASE(test_semaphore_2) { + return do_with(std::make_pair(semaphore(0), 0), [] (std::pair& x) { + x.first.wait().then([&x] { + x.second++; + }); + return sleep(10ms).then([&x] { + BOOST_REQUIRE_EQUAL(x.second, 0); + }); + }); +} + +SEASTAR_TEST_CASE(test_semaphore_timeout_1) { + return do_with(std::make_pair(semaphore(0), 0), [] (std::pair& x) { + x.first.wait(10ms).then([&x] { + x.second++; + }); + sleep(3ms).then([&x] { + x.first.signal(); + }); + return sleep(20ms).then([&x] { + BOOST_REQUIRE_EQUAL(x.second, 1); + }); + }); +} + +SEASTAR_TEST_CASE(test_semaphore_timeout_2) { + return do_with(std::make_pair(semaphore(0), 0), [] (std::pair& x) { + x.first.wait(3ms).then([&x] { + x.second++; + }); + sleep(10ms).then([&x] { + x.first.signal(); + }); + return sleep(20ms).then([&x] { + BOOST_REQUIRE_EQUAL(x.second, 0); + }); + }); +} + diff --git a/tests/thread_test.cc b/tests/thread_test.cc index 281eec423f..59b0fcc203 100644 --- a/tests/thread_test.cc +++ b/tests/thread_test.cc @@ -24,6 +24,7 @@ #include "core/do_with.hh" #include "test-utils.hh" #include "core/sstring.hh" +#include "core/reactor.hh" #include "core/semaphore.hh" #include "core/do_with.hh" #include "core/future-util.hh"