storage_service: Introduce session concept
This commit is contained in:
@@ -550,6 +550,7 @@ scylla_tests = set([
|
||||
'test/boost/network_topology_strategy_test',
|
||||
'test/boost/token_metadata_test',
|
||||
'test/boost/tablets_test',
|
||||
'test/boost/sessions_test',
|
||||
'test/boost/nonwrapping_range_test',
|
||||
'test/boost/observable_test',
|
||||
'test/boost/partitioner_test',
|
||||
@@ -1091,6 +1092,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'locator/util.cc',
|
||||
'service/client_state.cc',
|
||||
'service/storage_service.cc',
|
||||
'service/session.cc',
|
||||
'service/misc_services.cc',
|
||||
'service/pager/paging_state.cc',
|
||||
'service/pager/query_pagers.cc',
|
||||
|
||||
74
service/session.cc
Normal file
74
service/session.cc
Normal file
@@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include "service/session.hh"
|
||||
#include "log.hh"
|
||||
|
||||
namespace service {
|
||||
|
||||
static logging::logger slogger("session");
|
||||
|
||||
|
||||
session::guard::guard(session& s)
|
||||
: _session(&s)
|
||||
, _holder(s._gate.hold()) {
|
||||
}
|
||||
|
||||
session::guard::~guard() {
|
||||
}
|
||||
|
||||
session_manager::session_manager() {
|
||||
create_session(default_session_id);
|
||||
}
|
||||
|
||||
session::guard session_manager::enter_session(session_id id) {
|
||||
auto i = _sessions.find(id);
|
||||
if (i == _sessions.end()) {
|
||||
throw std::runtime_error(fmt::format("Session not found: {}", id));
|
||||
}
|
||||
auto guard = i->second->enter();
|
||||
slogger.debug("session {} entered", id);
|
||||
return guard;
|
||||
}
|
||||
|
||||
void session_manager::create_session(session_id id) {
|
||||
auto [i, created] = _sessions.emplace(id, std::make_unique<session>(id));
|
||||
if (created) {
|
||||
slogger.debug("session {} created", id);
|
||||
} else {
|
||||
slogger.debug("session {} already exists", id);
|
||||
}
|
||||
}
|
||||
|
||||
void session_manager::initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep) {
|
||||
for (auto&& [id, session] : _sessions) {
|
||||
if (id != default_session_id && !keep.contains(id)) {
|
||||
if (!session->is_closing()) {
|
||||
_closing_sessions.push_front(*session);
|
||||
}
|
||||
session->start_closing();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
future<> session_manager::drain_closing_sessions() {
|
||||
auto lock = co_await get_units(_session_drain_sem, 1);
|
||||
auto i = _closing_sessions.begin();
|
||||
while (i != _closing_sessions.end()) {
|
||||
session& s = *i;
|
||||
++i;
|
||||
auto id = s.id();
|
||||
slogger.debug("draining session {}", id);
|
||||
co_await s.close();
|
||||
if (_sessions.erase(id)) {
|
||||
slogger.debug("session {} closed", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace service
|
||||
129
service/session.hh
Normal file
129
service/session.hh
Normal file
@@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "utils/UUID.hh"
|
||||
|
||||
#include <seastar/core/coroutine.hh>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/shared_future.hh>
|
||||
#include <seastar/core/semaphore.hh>
|
||||
|
||||
#include <boost/intrusive/list.hpp>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace service {
|
||||
|
||||
using session_id = utils::tagged_uuid<struct session_id_tag>;
|
||||
|
||||
// We want it be different than default-constructed session_id to catch mistakes.
|
||||
constexpr session_id default_session_id = session_id(
|
||||
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
|
||||
|
||||
/// Session is used to track execution of work related to some greater task, identified by session_id.
|
||||
/// Work can enter the session using enter(), and is considered to be part of the session
|
||||
/// as long as the guard returned by enter() is alive.
|
||||
///
|
||||
/// Session goes over the following states monotonically:
|
||||
/// 1) open - accepts work via enter()
|
||||
/// 2) closing - rejects work via enter()
|
||||
/// 3) closed - rejects work via enter(), and no guards are alive anymore
|
||||
///
|
||||
/// Sessions are removed only after they are closed, it's impossible to have a session::guard of a session
|
||||
/// which is not in the registry.
|
||||
class session {
|
||||
public:
|
||||
using link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
private:
|
||||
session_id _id;
|
||||
seastar::gate _gate;
|
||||
std::optional<shared_future<>> _closed;
|
||||
link_type _link;
|
||||
public:
|
||||
using list_type = boost::intrusive::list<session,
|
||||
boost::intrusive::member_hook<session, session::link_type, &session::_link>,
|
||||
boost::intrusive::constant_time_size<false>>;
|
||||
public:
|
||||
class guard {
|
||||
session* _session;
|
||||
seastar::gate::holder _holder;
|
||||
public:
|
||||
explicit guard(session& s);
|
||||
guard(const guard&) noexcept = default;
|
||||
guard(guard&&) noexcept = default;
|
||||
guard& operator=(guard&&) noexcept = default;
|
||||
guard& operator=(const guard&) noexcept = default;
|
||||
~guard();
|
||||
|
||||
void check() const {
|
||||
if (!valid()) {
|
||||
throw seastar::abort_requested_exception();
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]] bool valid() const {
|
||||
return !_session->_closed;
|
||||
}
|
||||
};
|
||||
|
||||
explicit session(session_id id) : _id(id) {}
|
||||
|
||||
guard enter() {
|
||||
return guard(*this);
|
||||
};
|
||||
|
||||
/// No new work is admitted to enter the session after this.
|
||||
/// Can be called many times.
|
||||
void start_closing() noexcept {
|
||||
if (!_closed) {
|
||||
_closed = seastar::shared_future<>(_gate.close());
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true iff the session is not open.
|
||||
/// Calling enter() in this state will fail.
|
||||
bool is_closing() const {
|
||||
return bool(_closed);
|
||||
}
|
||||
|
||||
session_id id() const {
|
||||
return _id;
|
||||
}
|
||||
|
||||
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
|
||||
/// and it's impossible to create more such guards later.
|
||||
/// Can be called concurrently.
|
||||
future<> close() {
|
||||
start_closing();
|
||||
return _closed->get_future();
|
||||
}
|
||||
};
|
||||
|
||||
class session_manager {
|
||||
std::unordered_map<session_id, std::unique_ptr<session>> _sessions;
|
||||
session::list_type _closing_sessions;
|
||||
seastar::semaphore _session_drain_sem{1};
|
||||
public:
|
||||
session_manager();
|
||||
|
||||
session::guard enter_session(session_id id);
|
||||
|
||||
/// Creates a session on this shard if it doesn't exist yet.
|
||||
/// If the session already exists does nothing.
|
||||
void create_session(session_id);
|
||||
|
||||
/// Calls start_closing() on all sessions except those in keep.
|
||||
void initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep);
|
||||
|
||||
/// Post-condition: All sessions which are in closing state before the call will be in closed state after the call.
|
||||
/// Can be called concurrently.
|
||||
future<> drain_closing_sessions();
|
||||
};
|
||||
|
||||
} // namespace service
|
||||
@@ -10,6 +10,7 @@
|
||||
*/
|
||||
|
||||
#include "storage_service.hh"
|
||||
#include "service/session.hh"
|
||||
#include "dht/boot_strapper.hh"
|
||||
#include <seastar/core/distributed.hh>
|
||||
#include <seastar/util/defer.hh>
|
||||
|
||||
94
test/boost/sessions_test.cc
Normal file
94
test/boost/sessions_test.cc
Normal file
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (C) 2023-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
#include <seastar/testing/thread_test_case.hh>
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "test/lib/log.hh"
|
||||
|
||||
#include "service/session.hh"
|
||||
|
||||
using namespace service;
|
||||
|
||||
SEASTAR_TEST_CASE(test_default_session_always_exists) {
|
||||
return seastar::async([] {
|
||||
session_manager mgr;
|
||||
auto guard = mgr.enter_session(default_session_id);
|
||||
guard.check();
|
||||
|
||||
mgr.initiate_close_of_sessions_except({});
|
||||
mgr.drain_closing_sessions().get();
|
||||
|
||||
guard = mgr.enter_session(default_session_id);
|
||||
guard.check();
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_default_constructible_session_does_not_exist) {
|
||||
return seastar::async([] {
|
||||
session_manager mgr;
|
||||
session_id id;
|
||||
// For safety, we don't want to treat unset id same as default_session_id.
|
||||
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_session_closing) {
|
||||
return seastar::async([] {
|
||||
session_manager mgr;
|
||||
|
||||
auto id = session_id(utils::make_random_uuid());
|
||||
auto id2 = session_id(utils::make_random_uuid());
|
||||
auto id3 = session_id(utils::make_random_uuid());
|
||||
auto id4 = session_id(utils::make_random_uuid());
|
||||
|
||||
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
|
||||
|
||||
mgr.create_session(id);
|
||||
mgr.create_session(id2);
|
||||
|
||||
auto guard = mgr.enter_session(id);
|
||||
auto guard2 = mgr.enter_session(id2);
|
||||
|
||||
guard.check();
|
||||
guard2.check();
|
||||
|
||||
mgr.initiate_close_of_sessions_except({id});
|
||||
|
||||
BOOST_REQUIRE(guard.valid());
|
||||
BOOST_REQUIRE(!guard2.valid());
|
||||
|
||||
auto f = mgr.drain_closing_sessions();
|
||||
auto f2 = mgr.drain_closing_sessions(); // test concurrent drain
|
||||
BOOST_REQUIRE(!f.available()); // blocked by guard2
|
||||
|
||||
// Concurrent wait drain
|
||||
mgr.create_session(id3);
|
||||
mgr.initiate_close_of_sessions_except({id});
|
||||
mgr.create_session(id3); // no-op
|
||||
mgr.create_session(id4);
|
||||
|
||||
{
|
||||
auto _ = std::move(guard2);
|
||||
}
|
||||
|
||||
f.get();
|
||||
f2.get();
|
||||
|
||||
mgr.drain_closing_sessions().get();
|
||||
|
||||
BOOST_REQUIRE(guard.valid());
|
||||
|
||||
mgr.enter_session(id);
|
||||
mgr.enter_session(id4);
|
||||
BOOST_REQUIRE_THROW(mgr.enter_session(id2), std::runtime_error);
|
||||
BOOST_REQUIRE_THROW(mgr.enter_session(id3), std::runtime_error);
|
||||
});
|
||||
}
|
||||
@@ -211,7 +211,7 @@ struct tagged_uuid {
|
||||
}
|
||||
static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; }
|
||||
static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; }
|
||||
explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
|
||||
explicit constexpr tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
|
||||
constexpr tagged_uuid() = default;
|
||||
|
||||
const utils::UUID& uuid() const noexcept {
|
||||
|
||||
Reference in New Issue
Block a user