Files
scylladb/docs/dev/error_injection_events.md
2026-02-18 14:06:41 +00:00

4.3 KiB

Error Injection Event Stream Implementation

Overview

This implementation adds Server-Sent Events (SSE) support for error injection points, allowing tests to wait for injections to be triggered without log parsing.

Architecture

Backend (C++)

1. Event Notification System (utils/error_injection.hh)

  • Callback Type: error_injection_event_callback - function signature: void(std::string_view injection_name, std::string_view injection_type)
  • Storage: Thread-local vector of callbacks (_event_callbacks)
  • Notification: When any inject() method is called, notify_event() triggers all registered callbacks
  • Thread Safety: Each shard has its own error_injection instance with its own callbacks
  • Cross-Shard: Static methods use smp::invoke_on_all() to register callbacks on all shards

2. SSE Endpoint (api/error_injection.cc)

GET /v2/error_injection/events
Content-Type: text/event-stream

Flow:

  1. Client connects to SSE endpoint
  2. Server creates a queue on the current shard
  3. Callback registered on ALL shards that forwards events to this queue (using smp::submit_to)
  4. Server streams events in SSE format: data: {"injection":"name","type":"handler","shard":0}\n\n
  5. On disconnect (client closes or exception), callbacks are cleaned up

Event Format:

{
  "injection": "injection_name",
  "type": "sleep|handler|exception|lambda",
  "shard": 0
}

Python Client (test/pylib/rest_client.py)

InjectionEventStream Class

async with injection_event_stream(node_ip) as stream:
    event = await stream.wait_for_injection("my_injection", timeout=30)

Features:

  • Async context manager for automatic connection/disconnection
  • Background task reads SSE events
  • Queue-based event delivery
  • wait_for_injection() method filters events by injection name

Usage Examples

Basic Usage

async with injection_event_stream(server_ip) as event_stream:
    # Enable injection
    await api.enable_injection(server_ip, "my_injection", one_shot=True)
    
    # Trigger operation that hits injection
    # ... some operation ...
    
    # Wait for injection without log parsing!
    event = await event_stream.wait_for_injection("my_injection", timeout=30)
    logger.info(f"Injection hit on shard {event['shard']}")

Old vs New Approach

Old (Log Parsing):

log = await manager.server_open_log(server_id)
mark = await log.mark()
await api.enable_injection(ip, "my_injection", one_shot=True)
# ... operation ...
mark, _ = await log.wait_for('my_injection: waiting', from_mark=mark)

New (Event Stream):

async with injection_event_stream(ip) as stream:
    await api.enable_injection(ip, "my_injection", one_shot=True)
    # ... operation ...
    event = await stream.wait_for_injection("my_injection", timeout=30)

Benefits

  1. Performance: No waiting for log flushes or buffer processing
  2. Reliability: Direct event notifications, no regex matching failures
  3. Simplicity: Clean async/await pattern
  4. Flexibility: Can wait for multiple injections, get event metadata
  5. Backward Compatible: Existing log-based tests continue to work

Implementation Notes

Thread Safety

  • Each shard has independent error_injection instance
  • Events from any shard are delivered to SSE client via smp::submit_to
  • Queue operations are shard-local, avoiding cross-shard synchronization

Cleanup

  • Client disconnect triggers callback cleanup on all shards
  • Cleanup happens automatically via RAII (try/finally in stream function)
  • No callback leaks even if client disconnects abruptly

Logging

  • Injection triggers now log at INFO level (was DEBUG)
  • This ensures events are visible in logs AND via SSE
  • SSE provides machine-readable events, logs provide human-readable context

Testing

See test/cluster/test_error_injection_events.py for example tests:

  • test_injection_event_stream_basic: Basic functionality
  • test_injection_event_stream_multiple_injections: Multiple injection tracking
  • test_injection_event_vs_log_parsing_comparison: Old vs new comparison

Future Enhancements

Possible improvements:

  1. Filter events by injection name at server side (query parameter)
  2. Include injection parameters in events
  3. Add event timestamps
  4. Support for event history/replay
  5. WebSocket support (if bidirectional communication needed)