Files
scylladb/test/cluster/test_alternator_proxy_protocol.py
Avi Kivity 59f2a3ce72 test: test_alternator_proxy_protocol: wait for the node to report itself as serving
Use the new ServerUpState=SERVING mechanism to wait to the alternator
ports to be up, rather than relying on the default waiting for CQL,
which happens earlier and therefore opens a window where a connection to
the alternator ports will fail.
2026-01-27 17:25:59 +02:00

408 lines
16 KiB
Python

# Copyright 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
"""
Tests for Alternator (DynamoDB API) ports with proxy protocol v2 support.
These tests verify that when Scylla is configured with Alternator proxy protocol
ports, clients can connect using the PROXY protocol v2 header.
"""
import asyncio
import datetime
import hashlib
import hmac
import json
import logging
import socket
import ssl
import struct
from typing import Optional
import aiohttp
import pytest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.pylib.internal_types import ServerUpState
logger = logging.getLogger(__name__)
# Proxy protocol v2 signature (12 bytes)
PROXY_V2_SIGNATURE = b'\x0d\x0a\x0d\x0a\x00\x0d\x0a\x51\x55\x49\x54\x0a'
# Port numbers
ALTERNATOR_PORT = 8000
ALTERNATOR_HTTPS_PORT = 8043
ALTERNATOR_PROXY_PORT = 28000
ALTERNATOR_HTTPS_PROXY_PORT = 28043
def make_proxy_v2_header(src_addr: str, src_port: int, dst_addr: str, dst_port: int) -> bytes:
"""
Construct a proxy protocol v2 header for IPv4 TCP connections.
See: https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
"""
header = bytearray()
header.extend(PROXY_V2_SIGNATURE)
header.append(0x21) # Version 2, PROXY command
header.append(0x11) # AF_INET, STREAM/TCP
header.extend(struct.pack('!H', 12)) # Address block length
header.extend(socket.inet_aton(src_addr))
header.extend(socket.inet_aton(dst_addr))
header.extend(struct.pack('!H', src_port))
header.extend(struct.pack('!H', dst_port))
return bytes(header)
def sign_request(method: str, host: str, uri: str, headers: dict, body: str,
access_key: str, secret_key: str,
region: str = 'us-east-1', service: str = 'dynamodb') -> dict:
"""Sign an AWS request using Signature Version 4."""
t = datetime.datetime.now(datetime.UTC)
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
canonical_headers = f'host:{host}\nx-amz-date:{amz_date}\n'
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(body.encode('utf-8')).hexdigest()
canonical_request = f'{method}\n{uri}\n\n{canonical_headers}\n{signed_headers}\n{payload_hash}'
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = f'{date_stamp}/{region}/{service}/aws4_request'
string_to_sign = f'{algorithm}\n{amz_date}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()}'
def sign(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()
k_date = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
k_region = sign(k_date, region)
k_service = sign(k_region, service)
k_signing = sign(k_service, 'aws4_request')
signature = hmac.new(k_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()
new_headers = dict(headers)
new_headers['x-amz-date'] = amz_date
new_headers['Authorization'] = f'{algorithm} Credential={access_key}/{credential_scope}, SignedHeaders={signed_headers}, Signature={signature}'
return new_headers
class ProxyProtocolConnector(aiohttp.TCPConnector):
"""
Custom aiohttp connector that injects a proxy protocol v2 header
immediately after establishing the TCP connection, before any HTTP traffic.
"""
def __init__(self, proxy_src_addr: str, proxy_src_port: int,
ssl_context: Optional[ssl.SSLContext] = None, **kwargs):
# For TLS connections, we need to disable aiohttp's SSL handling
# so we can inject the proxy header before the TLS handshake.
# If ssl_context is None (HTTP), we must pass ssl=False to avoid default SSL validation.
ssl_param = ssl_context if ssl_context is not None else False
super().__init__(ssl=ssl_param, **kwargs)
self._proxy_src_addr = proxy_src_addr
self._proxy_src_port = proxy_src_port
self._custom_ssl_context = ssl_context
async def _create_connection(self, req, traces, timeout):
"""Override to inject proxy protocol header after TCP connect."""
# Get the target host and port
host = req.url.host
port = req.url.port or (443 if req.url.scheme == 'https' else 80)
# Create raw TCP connection
_, protocol = await self._loop.create_connection(
lambda: aiohttp.client_proto.ResponseHandler(self._loop),
host, port
)
# Send proxy protocol header immediately
proxy_header = make_proxy_v2_header(
self._proxy_src_addr, self._proxy_src_port, host, port
)
protocol.transport.write(proxy_header)
# If TLS is needed, upgrade the connection
if self._custom_ssl_context is not None:
transport = await self._loop.start_tls(
protocol.transport, protocol, self._custom_ssl_context,
server_hostname=host
)
protocol.transport = transport
return protocol
async def send_alternator_request(
host: str,
port: int,
operation: str,
request_body: dict,
use_ssl: bool = False,
proxy_src_addr: Optional[str] = None,
proxy_src_port: Optional[int] = None,
access_key: str = 'alternator',
secret_key: str = 'secret_pass'
) -> tuple[int, dict]:
"""
Send an Alternator request using aiohttp.
If proxy_src_addr and proxy_src_port are provided, a proxy protocol v2 header
is injected after TCP connect (and before TLS handshake for HTTPS).
Returns (status_code, response_body_dict).
"""
body = json.dumps(request_body)
headers = {
'Content-Type': 'application/x-amz-json-1.0',
'X-Amz-Target': f'DynamoDB_20120810.{operation}',
}
signed_headers = sign_request('POST', f'{host}:{port}', '/', headers, body, access_key, secret_key)
scheme = 'https' if use_ssl else 'http'
url = f'{scheme}://{host}:{port}/'
if proxy_src_addr is not None and proxy_src_port is not None:
# Use custom connector that injects proxy protocol header
ssl_context = None
if use_ssl:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connector = ProxyProtocolConnector(proxy_src_addr, proxy_src_port, ssl_context=ssl_context)
else:
# Regular connection without proxy protocol
connector = aiohttp.TCPConnector(ssl=False) if use_ssl else None
async with aiohttp.ClientSession(connector=connector) as session:
async with session.post(url, headers=signed_headers, data=body) as resp:
resp_body = await resp.text()
return resp.status, json.loads(resp_body) if resp_body else {}
# Server configuration for all proxy protocol tests
ALTERNATOR_PROXY_SERVER_CONFIG = {
'alternator_port': ALTERNATOR_PORT,
'alternator_https_port': ALTERNATOR_HTTPS_PORT,
'alternator_port_proxy_protocol': ALTERNATOR_PROXY_PORT,
'alternator_https_port_proxy_protocol': ALTERNATOR_HTTPS_PROXY_PORT,
'alternator_write_isolation': 'only_rmw_uses_lwt',
'alternator_enforce_authorization': False,
'alternator_encryption_options': {
'certificate': 'conf/scylla.crt',
'keyfile': 'conf/scylla.key',
},
}
@pytest.fixture(scope="function")
async def alternator_proxy_server(manager: ManagerClient):
"""Fixture that creates a server with Alternator proxy protocol ports enabled.
Waits for SERVING state to ensure Alternator ports are ready.
"""
server = await manager.server_add(
config=ALTERNATOR_PROXY_SERVER_CONFIG,
expected_server_up_state=ServerUpState.SERVING
)
yield (server, manager)
@pytest.mark.asyncio
async def test_alternator_proxy_protocol_basic(alternator_proxy_server):
"""Test that connections through the Alternator proxy protocol port work."""
server, _ = alternator_proxy_server
status_code, response = await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_PROXY_PORT,
operation='ListTables',
request_body={},
proxy_src_addr="203.0.113.42",
proxy_src_port=12345
)
assert status_code == 200, f"Expected status 200, got {status_code}: {response}"
assert 'TableNames' in response, f"Expected TableNames in response: {response}"
@pytest.mark.asyncio
async def test_alternator_proxy_protocol_multiple_connections(alternator_proxy_server):
"""Test multiple connections with different source addresses."""
server, _ = alternator_proxy_server
test_clients = [
("203.0.113.101", 10001),
("203.0.113.102", 10002),
("203.0.113.103", 10003),
]
for fake_src_addr, fake_src_port in test_clients:
status_code, response = await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_PROXY_PORT,
operation='ListTables',
request_body={},
proxy_src_addr=fake_src_addr,
proxy_src_port=fake_src_port
)
assert status_code == 200, f"Expected status 200 for {fake_src_addr}, got {status_code}: {response}"
assert 'TableNames' in response, f"Expected TableNames in response for {fake_src_addr}: {response}"
@pytest.mark.asyncio
async def test_alternator_proxy_protocol_ssl_basic(alternator_proxy_server):
"""Test proxy protocol with TLS encryption."""
server, _ = alternator_proxy_server
status_code, response = await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_HTTPS_PROXY_PORT,
operation='ListTables',
request_body={},
use_ssl=True,
proxy_src_addr="203.0.113.50",
proxy_src_port=55555
)
assert status_code == 200, f"Expected status 200, got {status_code}: {response}"
assert 'TableNames' in response, f"Expected TableNames in response: {response}"
@pytest.mark.asyncio
async def test_alternator_regular_port_still_works(alternator_proxy_server):
"""Test that the regular Alternator port still works when proxy protocol ports are configured."""
server, _ = alternator_proxy_server
status_code, response = await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_PORT,
operation='ListTables',
request_body={}
)
assert status_code == 200, f"Expected status 200, got {status_code}: {response}"
assert 'TableNames' in response, f"Expected TableNames in response: {response}"
@pytest.mark.asyncio
async def test_alternator_proxy_header_to_regular_port_fails(alternator_proxy_server):
"""Test that sending a proxy protocol header to the regular port fails.
The regular port does not expect proxy protocol, so the server should
reject the connection or fail to parse the request.
"""
server, _ = alternator_proxy_server
# Sending proxy header to regular port should fail - the server will
# interpret the proxy header bytes as HTTP and fail to parse
with pytest.raises(Exception):
await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_PORT,
operation='ListTables',
request_body={},
proxy_src_addr="203.0.113.42",
proxy_src_port=12345
)
@pytest.mark.asyncio
async def test_alternator_no_proxy_header_to_proxy_port_fails(alternator_proxy_server):
"""Test that sending a request without proxy header to the proxy port fails.
The proxy protocol port expects a proxy protocol header first, so sending
raw HTTP should fail.
"""
server, _ = alternator_proxy_server
# Sending without proxy header to proxy port should fail - the server
# expects a proxy protocol header first
with pytest.raises(Exception):
await send_alternator_request(
host=server.ip_addr,
port=ALTERNATOR_PROXY_PORT,
operation='ListTables',
request_body={}
# No proxy_src_addr/proxy_src_port means no proxy header
)
@pytest.mark.asyncio
@pytest.mark.parametrize("use_ssl", [False, True], ids=["http", "https"])
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_alternator_proxy_protocol_address_in_system_clients(alternator_proxy_server, use_ssl):
"""Test that the source address from the proxy protocol header is correctly
reported in system.clients.
Uses error injection to pause the Alternator request so we can query
system.clients while the request is in flight. Tests both HTTP and HTTPS
variants and verifies the ssl_enabled column is set correctly.
"""
server, manager = alternator_proxy_server
fake_src_addr = "203.0.113.100"
fake_src_port = 54321
port = ALTERNATOR_HTTPS_PROXY_PORT if use_ssl else ALTERNATOR_PROXY_PORT
async def wait_for_injection_waiting(injection_name: str, timeout: float = 10.0):
"""Poll until the injection has set 'waiting' parameter, indicating it's paused."""
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
injection_state = await manager.api.get_injection(server.ip_addr, injection_name)
# Each shard returns an object with 'enabled' and 'parameters' (array of {key, value})
for state in injection_state:
if state.get('enabled'):
for param in state.get('parameters', []):
if param.get('key') == 'waiting' and str(param.get('value')).lower() in ('true', '1'):
return
await asyncio.sleep(0.01)
raise TimeoutError(f"Injection {injection_name} did not reach waiting state within {timeout}s")
# Enable the error injection that pauses ListTables execution
async with inject_error(manager.api, server.ip_addr, "alternator_list_tables"):
# Start the request in the background - it will pause at the injection point
request_task = asyncio.create_task(send_alternator_request(
host=server.ip_addr,
port=port,
operation='ListTables',
request_body={},
use_ssl=use_ssl,
proxy_src_addr=fake_src_addr,
proxy_src_port=fake_src_port
))
def log_task_error(t):
if t.done() and t.exception():
print(f"DEBUG: Request task failed with: {t.exception()}")
request_task.add_done_callback(log_task_error)
# Wait until the injection has paused the request
await wait_for_injection_waiting("alternator_list_tables")
# Query system.clients while the request is paused
cql = manager.get_cql()
rows = list(cql.execute(
f"SELECT address, port, ssl_enabled FROM system.clients WHERE address = '{fake_src_addr}' ALLOW FILTERING"
))
# Send message to release the injection and let the request complete
await manager.api.message_injection(server.ip_addr, "alternator_list_tables")
# Wait for the request to complete
status_code, response = await request_task
# Verify the request succeeded
assert status_code == 200, f"Expected status 200, got {status_code}: {response}"
assert 'TableNames' in response, f"Expected TableNames in response: {response}"
# Verify we found the fake source address in system.clients
assert len(rows) > 0, f"Expected to find connection from {fake_src_addr} in system.clients"
matching_rows = [row for row in rows if row.port == fake_src_port]
assert len(matching_rows) > 0, \
f"Expected to find port {fake_src_port} in system.clients, got {[row.port for row in rows]}"
# Verify ssl_enabled matches whether we used SSL
assert matching_rows[0].ssl_enabled == use_ssl, \
f"Expected ssl_enabled={use_ssl}, got {matching_rows[0].ssl_enabled}"