Files
scylladb/ent/ldap/ldap_connection.cc
Piotr Smaron 288f9b2b15 Introduce LDAP role manager & saslauthd authenticator
This PR extends authentication with 2 mechanisms:
- a new role_manager subclass, which allows managing users via
LDAP server,
- a new authenticator, which delegates plaintext authentication
to a running saslauthd daemon.

The features have been ported from the enterprise repository
with their test.py tests and the documentation as part of
changing license to source available.

Fixes: scylladb/scylla-enterprise#5000
Fixes: scylladb/scylla-enterprise#5001

Closes scylladb/scylladb#22030
2025-01-12 14:50:29 +02:00

488 lines
18 KiB
C++

/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#define LDAP_DEPRECATED 1
#include "ldap_connection.hh"
#include <cerrno>
#include <cstring>
#include <fmt/format.h>
#include <stdexcept>
#include <string>
#include <seastar/core/seastar.hh>
#include <seastar/core/when_all.hh>
#include <seastar/util/later.hh>
#include <seastar/util/log.hh>
#include "seastarx.hh"
extern "C" {
// Declared in `ldap_pvt.h`, but this header is not usually installed by distributions even though
// it's considered public by upstream.
int ldap_init_fd(int, int, const char *, LDAP**);
}
namespace {
logger mylog{"ldap_connection"}; // `log` is taken by math.
constexpr int failure_code{-1}, success_code{0}; // LDAP return codes.
/// Helper function for Sockbuf_IO work.
ldap_connection* connection(Sockbuf_IO_Desc* sid) {
return reinterpret_cast<ldap_connection*>(sid->sbiod_pvt);
}
/// Sockbuf_IO setup function for ldap_connection.
int ssbi_setup(Sockbuf_IO_Desc* sid, void* arg) {
sid->sbiod_pvt = arg; // arg is ldap_connection, already set up.
return success_code;
}
/// Sockbuf_IO remove function for ldap_connection.
int ssbi_remove(Sockbuf_IO_Desc* sid) {
return success_code; // ldap_connection will be destructed by its owner.
}
void throw_if_failed(int status, const char* op, const ldap_connection& conn, int success = LDAP_SUCCESS) {
if (status != success) {
throw std::runtime_error(fmt::format("{} returned {}: {}", op, status, conn.get_error()));
}
}
} // anonymous namespace
std::mutex ldap_connection::_global_init_mutex;
void ldap_connection::ldap_deleter::operator()(LDAP* ld) {
mylog.trace("ldap_deleter: invoking unbind");
int status = ldap_unbind(ld);
if (status != LDAP_SUCCESS) {
mylog.error("ldap_unbind failed with status {}", status);
}
mylog.trace("ldap_deleter done");
}
ldap_connection::ldap_connection(seastar::connected_socket&& socket) :
_fd(file_desc::eventfd(0, EFD_NONBLOCK)) // Never ready for read, always ready for write.
, _socket(std::move(socket))
, _input_stream(_socket.input())
, _output_stream(_socket.output())
, _status(status::up)
, _read_consumer(now())
, _read_in_progress(false)
, _outstanding_write(now())
, _currently_polling(false) {
// Proactively initiate Seastar read, before ldap_connection::read() is first called.
read_ahead();
// Libldap determines if we're ready for an sbi_write by polling _fd. We're always ready to
// accept an sbi_write because we chain writes as continuations. Therefore, _fd always polls
// ready to write.
//
// Libldap determines if we're ready for an sbi_read by first calling sbi_ctrl(DATA_READY); only
// if that returns false does libldap then poll _fd. We are ready to accept an sbi_read when a
// prior Seastar read completed successfully and delivered some data into our _read_buffer.
// Only sbi_ctrl() knows that, which is why _fd always polls NOT READY to read.
//
// NB: libldap never actually reads or writes directly to _fd. It reads and writes through the
// custom Sockbuf_IO we provide it.
static constexpr int LDAP_PROTO_EXT = 4; // From ldap_pvt.h, which isn't always available.
mylog.trace("constructor invoking ldap_init");
LDAP* init_result;
{
std::lock_guard<std::mutex> global_init_lock{_global_init_mutex};
throw_if_failed(ldap_init_fd(_fd.get(), LDAP_PROTO_EXT, nullptr, &init_result), "ldap_init_fd", *this);
}
_ldap.reset(init_result);
static constexpr int opt_v3 = LDAP_VERSION3;
throw_if_failed(
ldap_set_option(_ldap.get(), LDAP_OPT_PROTOCOL_VERSION, &opt_v3), // Encouraged by ldap_set_option manpage.
"ldap_set_option protocol version",
*this,
LDAP_OPT_SUCCESS);
throw_if_failed(
ldap_set_option(_ldap.get(), LDAP_OPT_RESTART, LDAP_OPT_ON), // Retry on EINTR, rather than return error.
"ldap_set_option restart",
*this,
LDAP_OPT_SUCCESS);
throw_if_failed(
// Chasing referrals with this setup results in libldap crashing.
ldap_set_option(_ldap.get(), LDAP_OPT_REFERRALS, LDAP_OPT_OFF),
"ldap_set_option no referrals",
*this,
LDAP_OPT_SUCCESS);
Sockbuf* sb;
throw_if_failed(ldap_get_option(_ldap.get(), LDAP_OPT_SOCKBUF, &sb), "ldap_get_option", *this, LDAP_OPT_SUCCESS);
mylog.trace("constructor adding Sockbuf_IO");
throw_if_failed(
ber_sockbuf_add_io(sb, const_cast<Sockbuf_IO*>(&seastar_sbio), LBER_SBIOD_LEVEL_PROVIDER, this),
"ber_sockbuf_add_io",
*this);
mylog.trace("constructor done");
}
future<> ldap_connection::close() {
if (_status == status::down) {
mylog.error("close called while connection is down");
return make_exception_future<>(std::runtime_error("double close() of ldap_connection"));
}
_ldap.reset(); // Sends one last message to the server before reclaiming memory.
return when_all(
_read_consumer.finally([this] { return _input_stream.close(); })
.handle_exception([] (std::exception_ptr ep) {
mylog.error("Seastar input stream closing failed: {}", ep);
}),
_outstanding_write.finally([this] { return _output_stream.close(); })
.handle_exception([] (std::exception_ptr ep) {
mylog.error("Seastar output stream closing failed: {}", ep);
})
).discard_result().then([this] {
shutdown();
return make_ready_future<>();
});
}
future<ldap_msg_ptr> ldap_connection::await_result(int msgid) {
mylog.trace("await_result({})", msgid);
if (_status != status::up) {
mylog.error("await_result({}) error: connection is not up", msgid);
ldap_abandon_ext(get_ldap(), msgid, /*sctrls=*/nullptr, /*cctrls=*/nullptr);
return make_exception_future<ldap_msg_ptr>(std::runtime_error("ldap_connection status set to error"));
}
try {
return _msgid_to_promise[msgid].get_future();
} catch (...) {
auto ex = std::current_exception();
mylog.error("await_result({}) error: {}", msgid, ex);
// Tell LDAP to abandon this msgid, since we failed to register it.
ldap_abandon_ext(get_ldap(), msgid, /*sctrls=*/nullptr, /*cctrls=*/nullptr);
return make_exception_future<ldap_msg_ptr>(ex);
}
}
future<ldap_msg_ptr> ldap_connection::await_result(int status, int msgid) {
mylog.trace("await_result({}, {})", status, msgid);
if (status == LDAP_SUCCESS) {
return await_result(msgid);
} else {
const char* err = ldap_err2string(status);
mylog.trace("await_result({}, {}) reporting error {}", status, msgid, err);
return make_exception_future<ldap_msg_ptr>(
std::runtime_error(fmt::format("ldap operation error: {}", err)));
}
}
future<ldap_msg_ptr> ldap_connection::simple_bind(const char *who, const char *passwd) {
mylog.trace("simple_bind({})", who);
if (_status != status::up) {
mylog.error("simple_bind({}) punting, connection down", who);
return make_exception_future<ldap_msg_ptr>(
std::runtime_error("bind operation attempted on a closed ldap_connection"));
}
const int msgid = ldap_simple_bind(get_ldap(), who, passwd);
if (msgid == -1) {
const auto err = get_error();
mylog.error("ldap simple bind error: {}", err);
return make_exception_future<ldap_msg_ptr>(
std::runtime_error(fmt::format("ldap simple bind error: {}", err)));
} else {
mylog.trace("simple_bind: msgid {}", msgid);
return await_result(msgid);
}
}
future<ldap_msg_ptr> ldap_connection::search(
char *base,
int scope,
char *filter,
char *attrs[],
int attrsonly,
LDAPControl **serverctrls,
LDAPControl **clientctrls,
struct timeval *timeout,
int sizelimit) {
mylog.trace("search");
int msgid;
if (_status != status::up) {
mylog.error("search punting, connection down");
return make_exception_future<ldap_msg_ptr>(
std::runtime_error("search operation attempted on a closed ldap_connection"));
}
const int status = ldap_search_ext(
get_ldap(), base, scope, filter, attrs, attrsonly, serverctrls, clientctrls, timeout, sizelimit, &msgid);
return await_result(status, msgid);
}
sstring ldap_connection::get_error() const {
int result_code;
int status = ldap_get_option(get_ldap(), LDAP_OPT_RESULT_CODE, reinterpret_cast<void*>(&result_code));
if (status != LDAP_OPT_SUCCESS) {
mylog.error("ldap_get_option returned {}", status);
return "error description unavailable";
}
return ldap_err2string(result_code);
}
int ldap_connection::sbi_ctrl(Sockbuf_IO_Desc* sid, int opt, void* arg) noexcept {
mylog.debug("sbi_ctrl({}/{}, {}, {})", static_cast<void*>(sid), sid->sbiod_pvt, opt, arg);
auto conn = connection(sid);
switch (opt) {
case LBER_SB_OPT_DATA_READY:
return !conn->_read_buffer.empty()
|| conn->_status != status::up; // Let sbi_read proceed and report status; otherwise, LDAP loops forever.
case LBER_SB_OPT_GET_FD:
if (conn->_status == status::down) {
errno = ENOTCONN;
return -1;
}
*reinterpret_cast<ber_socket_t*>(arg) = conn->_fd.get();
return 1;
}
return 0;
}
ber_slen_t ldap_connection::sbi_read(Sockbuf_IO_Desc* sid, void* buffer, ber_len_t size) noexcept {
mylog.trace("sbi_read {}/{}", static_cast<const void*>(sid), static_cast<const void*>(sid->sbiod_pvt));
try {
return connection(sid)->read(reinterpret_cast<char*>(buffer), size);
} catch (...) {
mylog.error("Unexpected error while reading: {}", std::current_exception());
return failure_code;
}
}
ber_slen_t ldap_connection::sbi_write(Sockbuf_IO_Desc* sid, void* buffer, ber_len_t size) noexcept {
mylog.trace("sbi_write {}/{}", static_cast<const void*>(sid), static_cast<const void*>(sid->sbiod_pvt));
try {
return connection(sid)->write(reinterpret_cast<const char*>(buffer), size);
} catch (...) {
mylog.error("Unexpected error while writing: {}", std::current_exception());
return failure_code;
}
}
int ldap_connection::sbi_close(Sockbuf_IO_Desc* sid) noexcept {
mylog.debug("sbi_close {}/{}", static_cast<const void*>(sid), static_cast<const void*>(sid->sbiod_pvt));
// Leave actual closing to the owner of *this. Note sbi_close() will be invoked during
// ldap_unbind(), which also calls sbi_write() to convey one last message to the server. We
// remain open here, to try to communicate that message.
return success_code;
}
const Sockbuf_IO ldap_connection::seastar_sbio{
// Strictly speaking, designated initializers like this are not in the standard, but they're
// supported by both major compilers we use.
.sbi_setup = &ssbi_setup,
.sbi_remove = &ssbi_remove,
.sbi_ctrl = &ldap_connection::sbi_ctrl,
.sbi_read = &ldap_connection::sbi_read,
.sbi_write = &ldap_connection::sbi_write,
.sbi_close = &ldap_connection::sbi_close
};
void ldap_connection::read_ahead() {
if (_read_in_progress) { // Differs from _read_consumer.available(), because handle_exception adds a continuation.
mylog.warn("read_ahead called while a prior Seastar read is already in progress");
return;
}
if (_input_stream.eof()) {
mylog.error("read_ahead encountered EOF");
set_status(status::eof);
return;
}
mylog.trace("read_ahead");
_read_in_progress = true;
mylog.trace("read_ahead invoking socket read");
_read_consumer = _input_stream.read().then([this] (temporary_buffer<char> b) {
if (b.empty()) {
mylog.debug("read_ahead received empty buffer; assuming EOF");
set_status(status::eof);
return;
}
mylog.trace("read_ahead received data of size {}", b.size());
if (!_read_buffer.empty()) { // Shouldn't happen; read_ahead's purpose is to replenish empty _read_buffer.
mylog.error("read_ahead dropping {} unconsumed bytes", _read_buffer.size());
}
_read_buffer = std::move(b);
_read_in_progress = false;
poll_results();
}).handle_exception([this] (std::exception_ptr ep) {
mylog.error("Seastar read failed: {}", ep);
set_status(status::err);
});
mylog.trace("read_ahead done");
}
ber_slen_t ldap_connection::write(char const* b, ber_len_t size) {
mylog.trace("write({})", size);
switch (_status) {
case status::err:
mylog.trace("write({}) reporting error", size);
errno = ECONNRESET;
return -1;
case status::down:
mylog.trace("write({}) invoked after shutdown", size);
errno = ENOTCONN;
return -1;
case status::up:
case status::eof:
; // Proceed.
}
_outstanding_write = _outstanding_write.then([this, buf = temporary_buffer(b, size)] () mutable {
if (_status != status::up) {
return make_ready_future<>();
}
mylog.trace("write invoking socket write");
return _output_stream.write(std::move(buf)).then([this] {
// Sockbuf_IO doesn't seem to have the notion of flushing the stream, so we flush after
// every write.
mylog.trace("write invoking flush");
return _output_stream.flush();
}).handle_exception([this] (std::exception_ptr ep) {
mylog.error("Seastar write failed: {}", ep);
set_status(status::err);
});
});
mylog.trace("write({}) done, status={}", size, _status);
return _status == status::up ? size : -1; // _status can be err here if _outstanding_write threw.
}
ber_slen_t ldap_connection::read(char* b, ber_len_t size) {
mylog.trace("read({})", size);
switch (_status) {
case status::eof:
mylog.trace("read({}) reporting eof", size);
return 0;
case status::err:
mylog.trace("read({}) reporting error", size);
errno = ECONNRESET;
return -1;
case status::down:
mylog.trace("read({}) invoked after shutdown", size);
errno = ENOTCONN;
return -1;
case status::up:
; // Proceed.
}
if (_read_buffer.empty()) { // Can happen because libldap doesn't always wait for data to be ready.
mylog.trace("read({}) found empty read buffer", size);
// Don't invoke read_ahead() here; it was already invoked as soon as _read_buffer was
// drained. In fact, its Seastar read might have actually completed, and the buffer is
// about to be filled by a waiting continuation. We DON'T want another read_ahead() before
// that data is consumed.
errno = EWOULDBLOCK;
return 0;
}
const auto byte_count = std::min(_read_buffer.size(), size);
std::copy_n(_read_buffer.begin(), byte_count, b);
_read_buffer.trim_front(byte_count);
if (_read_buffer.empty()) {
mylog.trace("read({}) replenishing buffer", size);
read_ahead();
}
mylog.trace("read({}) returning {}", size, byte_count);
return byte_count;
}
void ldap_connection::shutdown() {
mylog.trace("shutdown");
set_status(status::down);
mylog.trace("shutdown: shutdown input");
_socket.shutdown_input();
mylog.trace("shutdown: shutdown output");
_socket.shutdown_output();
mylog.trace("shutdown done");
}
void ldap_connection::poll_results() {
mylog.trace("poll_results");
if (!_ldap) { // Could happen during close(), which unbinds.
mylog.debug("poll_results: _ldap is null, punting");
return;
}
if (_currently_polling) {
// This happens when ldap_result() calls read_ahead() and runs its inner continuation immediately.
mylog.debug("poll_results: _currently_polling somewhere up the call-stack, punting");
return;
}
// Ensure that _currently_polling is true until we return.
class flag_guard {
bool& _flag;
public:
flag_guard(bool& flag) : _flag(flag) { flag = true; }
~flag_guard() { _flag = false; }
} guard(_currently_polling);
LDAPMessage *result;
while (!_read_buffer.empty() && _status == status::up) {
static timeval zero_duration{};
mylog.trace("poll_results: {} in buffer, invoking ldap_result", _read_buffer.size());
const int status = ldap_result(get_ldap(), LDAP_RES_ANY, /*all=*/1, &zero_duration, &result);
if (status > 0) {
ldap_msg_ptr result_ptr(result);
const int id = ldap_msgid(result);
mylog.trace("poll_results: ldap_result returned status {}, id {}", status, id);
const auto found = _msgid_to_promise.find(id);
if (found == _msgid_to_promise.end()) {
mylog.error("poll_results: got valid result for unregistered id {}, dropping it", id);
ldap_msgfree(result);
} else {
found->second.set_value(std::move(result_ptr));
_msgid_to_promise.erase(found);
}
} else if (status < 0) {
mylog.error("poll_results: ldap_result returned status {}, error: {}", status, get_error());
set_status(status::err);
}
}
mylog.trace("poll_results done");
}
void ldap_connection::set_status(ldap_connection::status s) {
_status = s;
if (s != status::up) {
mylog.trace("set_status: signal result-waiting futures");
for (auto& e : _msgid_to_promise) {
e.second.set_exception(std::runtime_error("ldap_connection status set to error"));
}
_msgid_to_promise.clear();
}
}
ldap_reuser::ldap_reuser(sequential_producer<ldap_reuser::conn_ptr>::factory_t&& f)
: _make_conn(std::move(f)), _reaper(now()) {
}
void ldap_reuser::reap(conn_ptr& conn) {
if (!conn) {
return;
}
if (auto p = conn.release()) { // Safe to close, other fibers are done with it.
_reaper = _reaper.then([p = std::move(p)] () mutable {
return p->close().then_wrapped([p = std::move(p)] (future<> fut) {
if (fut.failed()) {
mylog.warn("failure closing dead ldap_connection: {}", fut.get_exception());
}
/* p disposed here */
});
});
}
}
future<> ldap_reuser::stop() {
reap(_conn);
return std::move(_reaper);
}