#!/usr/bin/env python3 """ FR-2: Audio Data Collector Service with WebSocket Live Streaming Reads audio metrics from STM32, validates, writes to DB, and streams via WebSocket. """ from __future__ import annotations import asyncio import logging import os import signal import sys from contextlib import suppress from datetime import datetime, timezone from typing import Callable, Optional import uvicorn from audio_validator import AudioValidator from db_writer import DatabaseWriter from protocol_parser import AudioMetrics from serial_reader import SerialReader from ws_app import app as ws_app from ws_app import manager logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) def _iso_z(dt: datetime) -> str: """Format datetime as ISO8601 with 'Z' suffix (UTC).""" return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z") class CollectorService: """Main collector service: serial → validate → DB + WebSocket.""" def __init__(self, serial_port: str, db_url: str, baudrate: int = 115200) -> None: self.serial_reader = SerialReader( port=serial_port, baudrate=baudrate, on_packet=self._handle_packet ) self.db_writer = DatabaseWriter(db_url=db_url) self.validator = AudioValidator() self._shutdown_event = asyncio.Event() # WebSocket broadcast queue (bounded to prevent memory issues) self._ws_queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=200) self._ws_broadcast_task: Optional[asyncio.Task[None]] = None # Uvicorn server for WebSocket endpoint self._uvicorn_server: Optional[uvicorn.Server] = None self._ws_server_task: Optional[asyncio.Task[None]] = None def shutdown(self) -> None: """Trigger graceful shutdown (called from signal handler).""" logger.info("Shutdown requested") self._shutdown_event.set() async def _ws_broadcast_loop(self) -> None: """Background task: consume queue and broadcast to WebSocket clients.""" try: while True: msg = await self._ws_queue.get() try: await manager.broadcast_json(msg) except Exception as e: logger.error("WS broadcast error: %s", e) finally: self._ws_queue.task_done() except asyncio.CancelledError: logger.debug("WS broadcast loop cancelled") async def _start_ws_server(self) -> None: """Start uvicorn server for WebSocket endpoint.""" host = os.getenv("WS_HOST", "0.0.0.0") port = int(os.getenv("WS_PORT", "8001")) config = uvicorn.Config( ws_app, host=host, port=port, log_level="warning", loop="asyncio", access_log=False, ) self._uvicorn_server = uvicorn.Server(config) try: logger.info("Starting WebSocket server on ws://%s:%d/ws/live", host, port) await self._uvicorn_server.serve() except SystemExit as e: logger.error("WS server failed (port %d already in use?): %s", port, e) self.shutdown() except Exception as e: logger.exception("WS server crashed: %s", e) self.shutdown() async def _handle_packet(self, packet: AudioMetrics) -> None: """ Process received audio packet: validate, write to DB, push to WebSocket. Args: packet: Parsed audio metrics from STM32 """ # Validate packet validation = self.validator.validate_packet(packet.rms_db, packet.freq_hz) if not validation.valid: logger.warning( "Invalid packet: %s (rms=%.1fdB freq=%dHz)", validation.error, packet.rms_db, packet.freq_hz, ) return # Push to WebSocket queue (non-blocking) msg = { "time": _iso_z(datetime.now(timezone.utc)), "rms_db": float(packet.rms_db), "freq_hz": int(packet.freq_hz), } try: self._ws_queue.put_nowait(msg) except asyncio.QueueFull: # Drop oldest message if queue full try: _ = self._ws_queue.get_nowait() self._ws_queue.task_done() except asyncio.QueueEmpty: pass self._ws_queue.put_nowait(msg) # Write to database try: await self.db_writer.add_record( timestamp_ms=packet.timestamp_ms, rms_db=packet.rms_db, freq_hz=packet.freq_hz, ) except Exception as e: logger.error("Failed to write to database: %s", e) async def start(self) -> None: """Start collector service: DB, WS, serial reader.""" logger.info("Starting Audio Data Collector Service") try: # Connect to database await self.db_writer.connect() await self.db_writer.start_auto_flush() # Start WebSocket server and broadcaster self._ws_broadcast_task = asyncio.create_task(self._ws_broadcast_loop()) self._ws_server_task = asyncio.create_task(self._start_ws_server()) # Give uvicorn a moment to bind (avoid race on port check) await asyncio.sleep(0.5) # Connect to serial port await self.serial_reader.connect() await self.serial_reader.start_reading() logger.info("Service started successfully") # Wait for shutdown signal await self._shutdown_event.wait() except Exception as e: logger.error("Service startup failed: %s", e) raise finally: await self.stop() async def stop(self) -> None: """Stop collector service gracefully.""" logger.info("Stopping Audio Data Collector Service") # Stop serial reader await self.serial_reader.disconnect() # Stop WebSocket server if self._uvicorn_server is not None: self._uvicorn_server.should_exit = True if self._ws_server_task is not None: self._ws_server_task.cancel() with suppress(asyncio.CancelledError, SystemExit, Exception): await self._ws_server_task # Stop WebSocket broadcaster if self._ws_broadcast_task is not None: self._ws_broadcast_task.cancel() with suppress(asyncio.CancelledError): await self._ws_broadcast_task # Close database writer (flushes remaining data) await self.db_writer.close() logger.info("Service stopped") async def _amain() -> None: """Async main entry point.""" # Read configuration from environment serial_port = os.getenv("SERIAL_PORT", "/dev/ttyACM0") baudrate = int(os.getenv("BAUDRATE", "115200")) db_host = os.getenv("DB_HOST", "localhost") db_port = os.getenv("DB_PORT", "5432") db_name = os.getenv("DB_NAME", "audio_analyzer") db_user = os.getenv("DB_USER", "postgres") db_password = os.getenv("DB_PASSWORD", "postgres") db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" # Create service service = CollectorService( serial_port=serial_port, db_url=db_url, baudrate=baudrate ) # Setup signal handlers for graceful shutdown loop = asyncio.get_running_loop() shutdown_callback: Callable[[], None] = service.shutdown for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, shutdown_callback) await service.start() def main() -> None: """Main entry point.""" try: asyncio.run(_amain()) except KeyboardInterrupt: logger.info("Interrupted by user") except Exception: logger.exception("Service error") sys.exit(1) if __name__ == "__main__": main()