transport: fix connection code to consume only initially taken semaphore units
The connection's cpu_concurrency_t struct tracks the state of a connection to manage the admission of new requests and prevent CPU overload during connection storms. When a connection holds units (allowed only 0 or 1), it is considered to be in the "CPU state" and contributes to the concurrency limits used when accepting new connections. The bug stems from the fact that `counted_data_source_impl::get` and `counted_data_sink_impl::put` calls can interleave during execution. This occurs because of `should_parallelize` and `_ready_to_respond`, the latter being a future chain that can run in the background while requests are being read. Consequently, while reading request (N), the system may concurrently be writing the response for request (N-1) on the same connection. This interleaving allows `return_all()` to be called twice before the subsequent `consume_units()` is invoked. While the second `return_all()` call correctly returns 0 units, the matching `consume_units()` call would mistakenly take an extra unit from the semaphore. Over time, a connection blocked on a read operation could end up holding an unreturned semaphore unit. If this pattern repeats across multiple connections, the semaphore units are eventually depleted, preventing the server from accepting any new connections. The fix ensures that we always consume the exact number of units that were previously returned. With this change, interleaved operations behave as follows: get() return_all — returns 1 unit put() return_all — returns 0 units get() consume_units — takes back 1 unit put() consume_units — takes back 0 units Logically, the networking phase ends when the first network operation concludes. But more importantly, when a network operation starts, we no longer hold any units. Other solutions are possible but the chosen one seems to be the simplest and safest to backport. Fixes SCYLLADB-485
This commit is contained in:
@@ -29,9 +29,10 @@ class counted_data_source_impl : public data_source_impl {
|
||||
if (_cpu_concurrency.stopped) {
|
||||
return fun();
|
||||
}
|
||||
size_t units = _cpu_concurrency.units.count();
|
||||
_cpu_concurrency.units.return_all();
|
||||
return fun().finally([this] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
|
||||
return fun().finally([this, units] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
|
||||
});
|
||||
};
|
||||
public:
|
||||
@@ -57,9 +58,10 @@ class counted_data_sink_impl : public data_sink_impl {
|
||||
if (_cpu_concurrency.stopped) {
|
||||
return fun();
|
||||
}
|
||||
size_t units = _cpu_concurrency.units.count();
|
||||
_cpu_concurrency.units.return_all();
|
||||
return fun().finally([this] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
|
||||
return fun().finally([this, units] () {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
|
||||
});
|
||||
};
|
||||
public:
|
||||
@@ -72,10 +74,10 @@ public:
|
||||
if (_cpu_concurrency.stopped) {
|
||||
return _ds.put(std::move(data));
|
||||
}
|
||||
|
||||
size_t units = _cpu_concurrency.units.count();
|
||||
_cpu_concurrency.units.return_all();
|
||||
return _ds.put(std::move(data)).finally([this] {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, 1));
|
||||
return _ds.put(std::move(data)).finally([this, units] {
|
||||
_cpu_concurrency.units.adopt(consume_units(_cpu_concurrency.semaphore, units));
|
||||
});
|
||||
}
|
||||
virtual future<> flush() override {
|
||||
|
||||
Reference in New Issue
Block a user