Files
scylladb/test/lib/proc_utils.cc
Ernest Zaslavsky 834eed10d9 test: fix use-after-free in start_docker_service retry path
start_docker_service is a coroutine that took docker_args and
image_args by const reference. Its caller start_fake_gcs_server
is a regular function that passes temporaries (initializer lists)
and immediately returns a future. The temporaries are destroyed
when the caller returns, leaving the coroutine holding dangling
references.

On the first loop iteration this works by luck (memory not yet
reused), but on retry (after "address already in use") the
params.append_range(image_args) reads freed memory, causing
use-after-free that manifests as std::bad_alloc or broken_promise
in non-sanitizer builds.

Fix by taking docker_args and image_args by value so the coroutine
frame owns the vectors for its entire lifetime.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-2003

Closes scylladb/scylladb#29932
2026-05-18 10:50:19 +03:00

364 lines
12 KiB
C++

/*
* Copyright (C) 2025-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
*/
#include "proc_utils.hh"
#include <iostream>
#include <ranges>
#include <regex>
#include <seastar/core/seastar.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/with_timeout.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/sleep.hh>
#include <seastar/net/inet_address.hh>
#include <seastar/net/api.hh>
#include <seastar/util/log.hh>
#include "utils/overloaded_functor.hh"
#include "test_utils.hh"
using namespace seastar;
class tests::proc::process_fixture::impl {
public:
experimental::process _process;
gate _gate;
impl(experimental::process process)
: _process(std::move(process))
{}
};
tests::proc::process_fixture::process_fixture(std::unique_ptr<impl> i)
: _impl(std::move(i))
{}
tests::proc::process_fixture::process_fixture(process_fixture&&) noexcept = default;
tests::proc::process_fixture::~process_fixture() = default;
extern char **environ;
future<tests::proc::process_fixture> tests::proc::process_fixture::create(const std::filesystem::path& exec
, const std::vector<std::string>& args
, const std::vector<std::string>& env
, handler_type stdout_handler
, handler_type stderr_handler
, bool inherit_env
) {
experimental::spawn_parameters params;
if (inherit_env) {
// copy existing env
for (auto** p = environ; *p != nullptr; ++p) {
params.env.emplace_back(*p);
}
}
std::copy(args.begin(), args.end(), std::back_inserter(params.argv));
std::copy(env.begin(), env.end(), std::back_inserter(params.env));
process_fixture res(std::make_unique<impl>(co_await experimental::spawn_process(exec, params)));
auto& proc = res._impl->_process;
auto& gate = res._impl->_gate;
auto wrap_line_handler = [](line_handler handler) {
return [handler = std::move(handler), str = std::string{}](buffer_type buf) mutable -> future<consumption_result<char>> {
auto off = str.size();
str.reserve(off + buf.size());
str.append(buf.begin(), buf.end());
auto i = buf.empty() ? str.size() : str.rfind('\n');
if (i == std::string::npos) {
co_return continue_consuming{};
}
auto range_view = std::string_view(str.data(), i);
for (auto v : std::views::split(range_view, '\n')) {
auto res = co_await handler(std::string_view(v));
if (std::holds_alternative<stop_consuming<char>>(res.get())) {
co_return res;
}
}
if (i < str.size() && str[i] == '\n') {
++i;
}
str.erase(str.begin(), str.begin() + i);
co_return continue_consuming{};
};
};
auto wrap_handler = [&](handler_type handler, input_stream<char> (experimental::process::*func)()) {
auto h = std::visit(overloaded_functor(
[&](std::monostate) -> stream_handler { return {}; },
[&](line_handler&& h) -> stream_handler { return wrap_line_handler(std::move(h)); },
[&](stream_handler&& h) -> stream_handler { return std::move(h); }
), std::move(handler));
if (h) {
auto g = gate.hold();
auto strm = std::make_unique<input_stream<char>>(std::invoke(func, proc));
auto& sr = *strm;
(void)sr.consume(std::move(h)).finally([g = std::move(g), strm = std::move(strm)] {});
}
};
wrap_handler(std::move(stdout_handler), &experimental::process::cout);
wrap_handler(std::move(stderr_handler), &experimental::process::cerr);
co_return res;
}
tests::proc::process_fixture::line_handler tests::proc::process_fixture::create_copy_handler(std::ostream& os) {
return [&os](std::string_view v) mutable -> future<consumption_result<char>> {
os << v << std::endl;
co_return continue_consuming{};
};
}
future<experimental::process::wait_status> tests::proc::process_fixture::wait() {
co_await _impl->_gate.close();
co_return co_await _impl->_process.wait();
}
void tests::proc::process_fixture::terminate() {
_impl->_process.terminate();
}
void tests::proc::process_fixture::kill() {
_impl->_process.kill();
}
input_stream<char> tests::proc::process_fixture::cout() {
return _impl->_process.cout();
}
input_stream<char> tests::proc::process_fixture::cerr() {
return _impl->_process.cerr();
}
output_stream<char> tests::proc::process_fixture::cin() {
return _impl->_process.cin();
}
namespace fs = std::filesystem;
fs::path tests::proc::find_file_in_path(std::string_view name,
const std::vector<fs::path>& path_prepend,
const std::vector<fs::path>& path_append
) {
static auto get_var = [](const char* name) {
auto res = std::getenv(name);
return res ? std::string(res) : std::string{};
};
static const std::vector<fs::path> system_paths = [] {
std::vector<fs::path> res;
// std::views::concat not yet in clang on my fedora.
for (auto p : std::views::split(get_var("PATH"), ':')) {
res.emplace_back(std::string_view(p));
}
res.emplace_back(get_var("PWD"));
res.emplace_back("/usr/bin");
res.emplace_back("/usr/local/bin");
return res;
}();
for (auto& paths : { path_prepend, system_paths, path_append }) {
for (auto& p : paths) {
auto test = p / name;
if (fs::exists(test) && !fs::is_directory(test)) {
return test;
}
}
}
return fs::path();
}
using namespace std::chrono_literals;
using namespace std::string_literals;
future<std::tuple<tests::proc::process_fixture, int>> tests::proc::start_docker_service(
std::string_view name,
std::string_view image,
parse_service_callback stdout_parse,
parse_service_callback stderr_parse,
std::vector<std::string> docker_args,
std::vector<std::string> image_args,
int service_port)
{
std::string container_name;
auto exec = find_file_in_path("docker");
if (exec.empty()) {
exec = find_file_in_path("podman");
}
if (exec.empty()) {
throw std::runtime_error("Could not find docker or podman.");
}
static int counter = 0;
struct in_use{};
constexpr auto max_retries = 8;
for (int retries = 0;; ++retries) {
container_name = fmt::format("{}-{}-{}", name, ::getpid(), ++counter);
// publish port ephemeral, allows parallel instances
std::vector<std::string> params = {
exec.string(),
"run", "--rm",
"--name", container_name,
};
if (service_port == 0) {
params.emplace_back("-P");
} else {
params.emplace_back("-p");
params.emplace_back(std::to_string(service_port));
}
params.append_range(docker_args);
params.emplace_back(image);
params.append_range(image_args);
BOOST_TEST_MESSAGE(fmt::format("Will run {}", params));
std::vector<std::string> env;
int port = 0;
promise<> ready_promise;
auto ready_fut = ready_promise.get_future();
auto create_handler = [](auto h_in, std::ostream& out) {
auto h = process_fixture::create_copy_handler(out);
promise<> ready_promise;
future<> f = ready_promise.get_future();
if (h_in) {
h = [state = service_parse_state::cont, ready_promise = std::move(ready_promise), h = std::move(h), h_in = std::move(h_in)](std::string_view line) mutable -> future<consumption_result<char>> {
if (state == service_parse_state::cont) {
switch (state = h_in(line)) {
case service_parse_state::success: ready_promise.set_value(); break;
case service_parse_state::failed: ready_promise.set_exception(in_use{}); break;
default:
break;
}
}
return h(line);
};
} else {
ready_promise.set_value();
}
return std::make_tuple(std::move(h), std::move(f));
};
auto [out_h, out_fut] = create_handler(stdout_parse, std::cout);
auto [err_h, err_fut] = create_handler(stderr_parse, std::cerr);
auto ps = co_await process_fixture::create(exec
, params
, env
, std::move(out_h)
, std::move(err_h)
);
std::exception_ptr p;
bool retry = false;
try {
BOOST_TEST_MESSAGE("Waiting for process to launch...");
// arbitrary timeout of 120s for the server to make some output. Very generous.
// but since we (maybe) run docker, and might need to pull image, this can take
// some time if we're unlucky.
auto [f1, f2] = co_await with_timeout(std::chrono::steady_clock::now() + 120s, when_all(std::move(out_fut), std::move(err_fut)));
for (auto* f : {&f1, &f2}) {
if (f->failed()) {
try {
f->get();
} catch (in_use&) {
retry = true;
p = std::current_exception();
} catch (...) {
if (!p) {
p = std::current_exception();
}
}
}
}
} catch (...) {
p = std::current_exception();
}
if (!p) {
// query port
promise<int> port_promise;
auto port_future = port_promise.get_future();
auto ps = co_await process_fixture::create(exec
, { "container", "port", container_name }
, {}
, [done = false, port_promise = std::move(port_promise)](std::string_view line) mutable -> future<consumption_result<char>> {
if (done) {
co_return continue_consuming{};
}
static std::regex port_ex(R"foo(\d+\/\w+ -> [\w+\.\[\]\:]+:(\d+))foo");
std::cmatch m;
if (std::regex_match(line.begin(), line.end(), m, port_ex)) {
BOOST_TEST_MESSAGE(fmt::format("Got port line: {}", line));
done = true;
port_promise.set_value(std::stoi(m[1].str()));
}
co_return continue_consuming{};
}
, process_fixture::create_copy_handler(std::cerr)
);
try {
BOOST_TEST_MESSAGE("Waiting for port query...");
co_await ps.wait();
port = co_await std::move(port_future);
} catch (...) {
p = std::current_exception();
}
}
auto backoff = 0ms;
auto con_retry = 0;
while (!p) {
if (backoff > 0ms) {
co_await seastar::sleep(backoff);
}
try {
BOOST_TEST_MESSAGE(fmt::format("Attempting to connect to {} at {}", name, port));
// TODO: seastar does not have a connect with timeout. That would be helpful here. But alas...
co_await with_timeout(std::chrono::steady_clock::now() + 20s, seastar::connect(socket_address(net::inet_address("127.0.0.1"), port)));
BOOST_TEST_MESSAGE(fmt::format("{} up and available", name)); // debug print. Why not.
} catch (std::system_error&) {
retry = true;
if (con_retry++ < max_retries) {
backoff = 100ms;
continue;
}
p = std::current_exception();
} catch (...) {
p = std::current_exception();
}
break;
}
if (p != nullptr) {
BOOST_TEST_MESSAGE(fmt::format("Got exception starting {}: {}", name, p));
ps.terminate();
co_await ps.wait();
if (!retry || retries >= max_retries) {
std::rethrow_exception(p);
}
continue;
}
co_return std::make_tuple(std::move(ps), port);
}
}