mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-21 09:00:35 +00:00
scylla_cluster: Use thread safe future signalling
This commit is contained in:
@@ -747,6 +747,8 @@ class ScyllaServer:
|
||||
self.notify_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC)
|
||||
self.notify_socket.bind(str(self.notify_socket_path))
|
||||
self._received_serving = False
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def poll_status(s: socket.socket, f: asyncio.Future, logger: Union[logging.Logger, logging.LoggerAdapter]):
|
||||
# Try to read all available messages from the socket
|
||||
while True:
|
||||
@@ -756,7 +758,7 @@ class ScyllaServer:
|
||||
message = data.decode('utf-8', errors='replace')
|
||||
if 'STATUS=serving' in message:
|
||||
logger.debug("Received sd_notify 'serving' message")
|
||||
f.set_result(True)
|
||||
loop.call_soon_threadsafe(f.set_result, True)
|
||||
return
|
||||
if 'STATUS=entering maintenance mode' in message:
|
||||
logger.debug("Receive sd_notify 'entering maintenance mode'")
|
||||
@@ -766,9 +768,9 @@ class ScyllaServer:
|
||||
except Exception as e:
|
||||
logger.debug("Error reading from notify socket: %s", e)
|
||||
break
|
||||
f.set_result(False)
|
||||
loop.call_soon_threadsafe(f.set_result, False)
|
||||
|
||||
self.serving_signal = asyncio.get_running_loop().create_future()
|
||||
self.serving_signal = loop.create_future()
|
||||
t = threading.Thread(target=poll_status, args=[self.notify_socket, self.serving_signal, self.logger], daemon=True)
|
||||
t.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user