Files
2025-12-28 22:25:45 +03:00

249 lines
8.0 KiB
Python

#!/usr/bin/env python3
"""
FR-2: Audio Data Collector Service with WebSocket Live Streaming
Reads audio metrics from STM32, validates, writes to DB, and streams via WebSocket.
"""
from __future__ import annotations
import asyncio
import logging
import os
import signal
import sys
from contextlib import suppress
from datetime import datetime, timezone
from typing import Callable, Optional
import uvicorn
from audio_validator import AudioValidator
from db_writer import DatabaseWriter
from protocol_parser import AudioMetrics
from serial_reader import SerialReader
from ws_app import app as ws_app
from ws_app import manager
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
def _iso_z(dt: datetime) -> str:
"""Format datetime as ISO8601 with 'Z' suffix (UTC)."""
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
class CollectorService:
"""Main collector service: serial → validate → DB + WebSocket."""
def __init__(self, serial_port: str, db_url: str, baudrate: int = 115200) -> None:
self.serial_reader = SerialReader(
port=serial_port, baudrate=baudrate, on_packet=self._handle_packet
)
self.db_writer = DatabaseWriter(db_url=db_url)
self.validator = AudioValidator()
self._shutdown_event = asyncio.Event()
# WebSocket broadcast queue (bounded to prevent memory issues)
self._ws_queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=200)
self._ws_broadcast_task: Optional[asyncio.Task[None]] = None
# Uvicorn server for WebSocket endpoint
self._uvicorn_server: Optional[uvicorn.Server] = None
self._ws_server_task: Optional[asyncio.Task[None]] = None
def shutdown(self) -> None:
"""Trigger graceful shutdown (called from signal handler)."""
logger.info("Shutdown requested")
self._shutdown_event.set()
async def _ws_broadcast_loop(self) -> None:
"""Background task: consume queue and broadcast to WebSocket clients."""
try:
while True:
msg = await self._ws_queue.get()
try:
await manager.broadcast_json(msg)
except Exception as e:
logger.error("WS broadcast error: %s", e)
finally:
self._ws_queue.task_done()
except asyncio.CancelledError:
logger.debug("WS broadcast loop cancelled")
async def _start_ws_server(self) -> None:
"""Start uvicorn server for WebSocket endpoint."""
host = os.getenv("WS_HOST", "0.0.0.0")
port = int(os.getenv("WS_PORT", "8001"))
config = uvicorn.Config(
ws_app,
host=host,
port=port,
log_level="warning",
loop="asyncio",
access_log=False,
)
self._uvicorn_server = uvicorn.Server(config)
try:
logger.info("Starting WebSocket server on ws://%s:%d/ws/live", host, port)
await self._uvicorn_server.serve()
except SystemExit as e:
logger.error("WS server failed (port %d already in use?): %s", port, e)
self.shutdown()
except Exception as e:
logger.exception("WS server crashed: %s", e)
self.shutdown()
async def _handle_packet(self, packet: AudioMetrics) -> None:
"""
Process received audio packet: validate, write to DB, push to WebSocket.
Args:
packet: Parsed audio metrics from STM32
"""
# Validate packet
validation = self.validator.validate_packet(packet.rms_db, packet.freq_hz)
if not validation.valid:
logger.warning(
"Invalid packet: %s (rms=%.1fdB freq=%dHz)",
validation.error,
packet.rms_db,
packet.freq_hz,
)
return
# Push to WebSocket queue (non-blocking)
msg = {
"time": _iso_z(datetime.now(timezone.utc)),
"rms_db": float(packet.rms_db),
"freq_hz": int(packet.freq_hz),
}
try:
self._ws_queue.put_nowait(msg)
except asyncio.QueueFull:
# Drop oldest message if queue full
try:
_ = self._ws_queue.get_nowait()
self._ws_queue.task_done()
except asyncio.QueueEmpty:
pass
self._ws_queue.put_nowait(msg)
# Write to database
try:
await self.db_writer.add_record(
timestamp_ms=packet.timestamp_ms,
rms_db=packet.rms_db,
freq_hz=packet.freq_hz,
)
except Exception as e:
logger.error("Failed to write to database: %s", e)
async def start(self) -> None:
"""Start collector service: DB, WS, serial reader."""
logger.info("Starting Audio Data Collector Service")
try:
# Connect to database
await self.db_writer.connect()
await self.db_writer.start_auto_flush()
# Start WebSocket server and broadcaster
self._ws_broadcast_task = asyncio.create_task(self._ws_broadcast_loop())
self._ws_server_task = asyncio.create_task(self._start_ws_server())
# Give uvicorn a moment to bind (avoid race on port check)
await asyncio.sleep(0.5)
# Connect to serial port
await self.serial_reader.connect()
await self.serial_reader.start_reading()
logger.info("Service started successfully")
# Wait for shutdown signal
await self._shutdown_event.wait()
except Exception as e:
logger.error("Service startup failed: %s", e)
raise
finally:
await self.stop()
async def stop(self) -> None:
"""Stop collector service gracefully."""
logger.info("Stopping Audio Data Collector Service")
# Stop serial reader
await self.serial_reader.disconnect()
# Stop WebSocket server
if self._uvicorn_server is not None:
self._uvicorn_server.should_exit = True
if self._ws_server_task is not None:
self._ws_server_task.cancel()
with suppress(asyncio.CancelledError, SystemExit, Exception):
await self._ws_server_task
# Stop WebSocket broadcaster
if self._ws_broadcast_task is not None:
self._ws_broadcast_task.cancel()
with suppress(asyncio.CancelledError):
await self._ws_broadcast_task
# Close database writer (flushes remaining data)
await self.db_writer.close()
logger.info("Service stopped")
async def _amain() -> None:
"""Async main entry point."""
# Read configuration from environment
serial_port = os.getenv("SERIAL_PORT", "/dev/ttyACM0")
baudrate = int(os.getenv("BAUDRATE", "115200"))
db_host = os.getenv("DB_HOST", "localhost")
db_port = os.getenv("DB_PORT", "5432")
db_name = os.getenv("DB_NAME", "audio_analyzer")
db_user = os.getenv("DB_USER", "postgres")
db_password = os.getenv("DB_PASSWORD", "postgres")
db_url = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
# Create service
service = CollectorService(
serial_port=serial_port, db_url=db_url, baudrate=baudrate
)
# Setup signal handlers for graceful shutdown
loop = asyncio.get_running_loop()
shutdown_callback: Callable[[], None] = service.shutdown
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, shutdown_callback)
await service.start()
def main() -> None:
"""Main entry point."""
try:
asyncio.run(_amain())
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception:
logger.exception("Service error")
sys.exit(1)
if __name__ == "__main__":
main()