storage_service: Introduce session concept

This commit is contained in:
Tomasz Grabiec
2023-10-26 00:03:25 +02:00
parent 2d4cd9c574
commit d3d83869ce
6 changed files with 301 additions and 1 deletions

View File

@@ -550,6 +550,7 @@ scylla_tests = set([
'test/boost/network_topology_strategy_test',
'test/boost/token_metadata_test',
'test/boost/tablets_test',
'test/boost/sessions_test',
'test/boost/nonwrapping_range_test',
'test/boost/observable_test',
'test/boost/partitioner_test',
@@ -1091,6 +1092,7 @@ scylla_core = (['message/messaging_service.cc',
'locator/util.cc',
'service/client_state.cc',
'service/storage_service.cc',
'service/session.cc',
'service/misc_services.cc',
'service/pager/paging_state.cc',
'service/pager/query_pagers.cc',

74
service/session.cc Normal file
View File

@@ -0,0 +1,74 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "service/session.hh"
#include "log.hh"
namespace service {
static logging::logger slogger("session");
session::guard::guard(session& s)
: _session(&s)
, _holder(s._gate.hold()) {
}
session::guard::~guard() {
}
session_manager::session_manager() {
create_session(default_session_id);
}
session::guard session_manager::enter_session(session_id id) {
auto i = _sessions.find(id);
if (i == _sessions.end()) {
throw std::runtime_error(fmt::format("Session not found: {}", id));
}
auto guard = i->second->enter();
slogger.debug("session {} entered", id);
return guard;
}
void session_manager::create_session(session_id id) {
auto [i, created] = _sessions.emplace(id, std::make_unique<session>(id));
if (created) {
slogger.debug("session {} created", id);
} else {
slogger.debug("session {} already exists", id);
}
}
void session_manager::initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep) {
for (auto&& [id, session] : _sessions) {
if (id != default_session_id && !keep.contains(id)) {
if (!session->is_closing()) {
_closing_sessions.push_front(*session);
}
session->start_closing();
}
}
}
future<> session_manager::drain_closing_sessions() {
auto lock = co_await get_units(_session_drain_sem, 1);
auto i = _closing_sessions.begin();
while (i != _closing_sessions.end()) {
session& s = *i;
++i;
auto id = s.id();
slogger.debug("draining session {}", id);
co_await s.close();
if (_sessions.erase(id)) {
slogger.debug("session {} closed", id);
}
}
}
} // namespace service

129
service/session.hh Normal file
View File

@@ -0,0 +1,129 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "utils/UUID.hh"
#include <seastar/core/coroutine.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_future.hh>
#include <seastar/core/semaphore.hh>
#include <boost/intrusive/list.hpp>
#include <unordered_set>
namespace service {
using session_id = utils::tagged_uuid<struct session_id_tag>;
// We want it be different than default-constructed session_id to catch mistakes.
constexpr session_id default_session_id = session_id(
utils::UUID(0x81e7fc5a8d4411ee, 0x8577325096b39f47)); // timeuuid 2023-11-27 16:46:27.182089.0 UTC
/// Session is used to track execution of work related to some greater task, identified by session_id.
/// Work can enter the session using enter(), and is considered to be part of the session
/// as long as the guard returned by enter() is alive.
///
/// Session goes over the following states monotonically:
/// 1) open - accepts work via enter()
/// 2) closing - rejects work via enter()
/// 3) closed - rejects work via enter(), and no guards are alive anymore
///
/// Sessions are removed only after they are closed, it's impossible to have a session::guard of a session
/// which is not in the registry.
class session {
public:
using link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
private:
session_id _id;
seastar::gate _gate;
std::optional<shared_future<>> _closed;
link_type _link;
public:
using list_type = boost::intrusive::list<session,
boost::intrusive::member_hook<session, session::link_type, &session::_link>,
boost::intrusive::constant_time_size<false>>;
public:
class guard {
session* _session;
seastar::gate::holder _holder;
public:
explicit guard(session& s);
guard(const guard&) noexcept = default;
guard(guard&&) noexcept = default;
guard& operator=(guard&&) noexcept = default;
guard& operator=(const guard&) noexcept = default;
~guard();
void check() const {
if (!valid()) {
throw seastar::abort_requested_exception();
}
}
[[nodiscard]] bool valid() const {
return !_session->_closed;
}
};
explicit session(session_id id) : _id(id) {}
guard enter() {
return guard(*this);
};
/// No new work is admitted to enter the session after this.
/// Can be called many times.
void start_closing() noexcept {
if (!_closed) {
_closed = seastar::shared_future<>(_gate.close());
}
}
/// Returns true iff the session is not open.
/// Calling enter() in this state will fail.
bool is_closing() const {
return bool(_closed);
}
session_id id() const {
return _id;
}
/// Post-condition of successfully resolved future: There are no guards alive for this session, and
/// and it's impossible to create more such guards later.
/// Can be called concurrently.
future<> close() {
start_closing();
return _closed->get_future();
}
};
class session_manager {
std::unordered_map<session_id, std::unique_ptr<session>> _sessions;
session::list_type _closing_sessions;
seastar::semaphore _session_drain_sem{1};
public:
session_manager();
session::guard enter_session(session_id id);
/// Creates a session on this shard if it doesn't exist yet.
/// If the session already exists does nothing.
void create_session(session_id);
/// Calls start_closing() on all sessions except those in keep.
void initiate_close_of_sessions_except(const std::unordered_set<session_id>& keep);
/// Post-condition: All sessions which are in closing state before the call will be in closed state after the call.
/// Can be called concurrently.
future<> drain_closing_sessions();
};
} // namespace service

View File

@@ -10,6 +10,7 @@
*/
#include "storage_service.hh"
#include "service/session.hh"
#include "dht/boot_strapper.hh"
#include <seastar/core/distributed.hh>
#include <seastar/util/defer.hh>

View File

@@ -0,0 +1,94 @@
/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "test/lib/scylla_test_case.hh"
#include <seastar/testing/thread_test_case.hh>
#include "test/lib/random_utils.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/log.hh"
#include "service/session.hh"
using namespace service;
SEASTAR_TEST_CASE(test_default_session_always_exists) {
return seastar::async([] {
session_manager mgr;
auto guard = mgr.enter_session(default_session_id);
guard.check();
mgr.initiate_close_of_sessions_except({});
mgr.drain_closing_sessions().get();
guard = mgr.enter_session(default_session_id);
guard.check();
});
}
SEASTAR_TEST_CASE(test_default_constructible_session_does_not_exist) {
return seastar::async([] {
session_manager mgr;
session_id id;
// For safety, we don't want to treat unset id same as default_session_id.
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
});
}
SEASTAR_TEST_CASE(test_session_closing) {
return seastar::async([] {
session_manager mgr;
auto id = session_id(utils::make_random_uuid());
auto id2 = session_id(utils::make_random_uuid());
auto id3 = session_id(utils::make_random_uuid());
auto id4 = session_id(utils::make_random_uuid());
BOOST_REQUIRE_THROW(mgr.enter_session(id), std::runtime_error);
mgr.create_session(id);
mgr.create_session(id2);
auto guard = mgr.enter_session(id);
auto guard2 = mgr.enter_session(id2);
guard.check();
guard2.check();
mgr.initiate_close_of_sessions_except({id});
BOOST_REQUIRE(guard.valid());
BOOST_REQUIRE(!guard2.valid());
auto f = mgr.drain_closing_sessions();
auto f2 = mgr.drain_closing_sessions(); // test concurrent drain
BOOST_REQUIRE(!f.available()); // blocked by guard2
// Concurrent wait drain
mgr.create_session(id3);
mgr.initiate_close_of_sessions_except({id});
mgr.create_session(id3); // no-op
mgr.create_session(id4);
{
auto _ = std::move(guard2);
}
f.get();
f2.get();
mgr.drain_closing_sessions().get();
BOOST_REQUIRE(guard.valid());
mgr.enter_session(id);
mgr.enter_session(id4);
BOOST_REQUIRE_THROW(mgr.enter_session(id2), std::runtime_error);
BOOST_REQUIRE_THROW(mgr.enter_session(id3), std::runtime_error);
});
}

View File

@@ -211,7 +211,7 @@ struct tagged_uuid {
}
static tagged_uuid create_random_id() noexcept { return tagged_uuid{utils::make_random_uuid()}; }
static constexpr tagged_uuid create_null_id() noexcept { return tagged_uuid{}; }
explicit tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
explicit constexpr tagged_uuid(const utils::UUID& uuid) noexcept : id(uuid) {}
constexpr tagged_uuid() = default;
const utils::UUID& uuid() const noexcept {