transport: fix connection code to consume only initially taken semaphore units

The connection's cpu_concurrency_t struct tracks the state of a connection
to manage the admission of new requests and prevent CPU overload during
connection storms. When a connection holds units (allowed only 0 or 1), it is
considered to be in the "CPU state" and contributes to the concurrency limits
used when accepting new connections.

The bug stems from the fact that `counted_data_source_impl::get` and
`counted_data_sink_impl::put` calls can interleave during execution. This
occurs because of `should_parallelize` and `_ready_to_respond`, the latter being
a future chain that can run in the background while requests are being read.
Consequently, while reading request (N), the system may concurrently be
writing the response for request (N-1) on the same connection.

This interleaving allows `return_all()` to be called twice before the
subsequent `consume_units()` is invoked. While the second `return_all()` call
correctly returns 0 units, the matching `consume_units()` call would
mistakenly take an extra unit from the semaphore. Over time, a connection
blocked on a read operation could end up holding an unreturned semaphore
unit. If this pattern repeats across multiple connections, the semaphore
units are eventually depleted, preventing the server from accepting any
new connections.

The fix ensures that we always consume the exact number of units that were
previously returned. With this change, interleaved operations behave as
follows:

get() return_all     — returns 1 unit
put() return_all     — returns 0 units
get() consume_units  — takes back 1 unit
put() consume_units  — takes back 0 units

Logically, the networking phase ends when the first network operation
concludes. But more importantly, when a network operation
starts, we no longer hold any units.

Other solutions are possible but the chosen one seems to be the
simplest and safest to backport.

Fixes SCYLLADB-485
This commit is contained in:
Marcin Maliszkiewicz
2026-02-03 14:51:24 +01:00
parent f45465b9f6
commit 0376d16ad3

View File

@@ -29,9 +29,10 @@ class counted_data_source_impl : public data_source_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
size_t units = _cpu_concurrency.units.count();
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
return fun().finally([this, units] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
});
};
public:
@@ -57,9 +58,10 @@ class counted_data_sink_impl : public data_sink_impl {
if (_cpu_concurrency.stopped) {
return fun();
}
size_t units = _cpu_concurrency.units.count();
_cpu_concurrency.units.return_all();
return fun().finally([this] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
return fun().finally([this, units] () {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
});
};
public:
@@ -72,10 +74,10 @@ public:
if (_cpu_concurrency.stopped) {
return _ds.put(std::move(data));
}
size_t units = _cpu_concurrency.units.count();
_cpu_concurrency.units.return_all();
return _ds.put(std::move(data)).finally([this] {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
return _ds.put(std::move(data)).finally([this, units] {
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
});
}
virtual future<> flush() override {