From cfec8d0ff6ef367719ad1fe07a0e9ecef8b97d47 Mon Sep 17 00:00:00 2001 From: Iwwww Date: Fri, 26 Dec 2025 18:04:17 +0300 Subject: [PATCH] feat(collector): add collector service --- services/collector/Dockerfile | 19 +++ services/collector/__init__.py | 0 services/collector/audio_validator.py | 114 ++++++++++++++++++ services/collector/db_writer.py | 165 ++++++++++++++++++++++++++ services/collector/main.py | 144 ++++++++++++++++++++++ services/collector/requirements.txt | 5 + services/collector/serial_reader.py | 122 +++++++++++++++++++ tests/test_device.py | 40 ------- 8 files changed, 569 insertions(+), 40 deletions(-) create mode 100644 services/collector/Dockerfile create mode 100644 services/collector/__init__.py create mode 100644 services/collector/audio_validator.py create mode 100644 services/collector/db_writer.py create mode 100644 services/collector/main.py create mode 100644 services/collector/requirements.txt create mode 100644 services/collector/serial_reader.py delete mode 100644 tests/test_device.py diff --git a/services/collector/Dockerfile b/services/collector/Dockerfile new file mode 100644 index 0000000..6878135 --- /dev/null +++ b/services/collector/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y \ + --no-install-recommends \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY *.py ./ + +# Run as non-root user +RUN useradd -m -u 1000 collector && chown -R collector:collector /app +USER collector + +CMD ["python", "main.py"] diff --git a/services/collector/__init__.py b/services/collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/collector/audio_validator.py b/services/collector/audio_validator.py new file mode 100644 index 0000000..dafba19 --- /dev/null +++ b/services/collector/audio_validator.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +FR-2.2: Audio Data Validation +Validates audio metrics against expected ranges +""" + +from typing import NamedTuple + + +class ValidationResult(NamedTuple): + """Validation result""" + + valid: bool + error: str = "" + + +class AudioValidator: + """ + Validates audio metrics against hardware constraints and realistic ranges. + """ + + # Hardware constraints (from FR spec) + RMS_MIN_DB = -40.0 # Noise floor + RMS_MAX_DB = 80.0 # Clipping threshold + + FREQ_MIN_HZ = 100 # Below = unreliable (FFT bin size ~43Hz) + FREQ_MAX_HZ = 8000 # Nyquist @ 22.05kHz with safety margin + + # Extended ranges for detection (not storage) + FREQ_MIN_EXTENDED_HZ = 20 + FREQ_MAX_EXTENDED_HZ = 11000 + + @staticmethod + def validate_rms(rms_db: float) -> ValidationResult: + """ + Validate RMS value. + + Args: + rms_db: RMS in dB + + Returns: + ValidationResult with valid flag and error message + """ + if not isinstance(rms_db, (int, float)): + return ValidationResult(False, "RMS must be numeric") + + if rms_db < AudioValidator.RMS_MIN_DB: + return ValidationResult( + False, f"RMS {rms_db:.1f}dB below minimum {AudioValidator.RMS_MIN_DB}dB" + ) + + if rms_db > AudioValidator.RMS_MAX_DB: + return ValidationResult( + False, + f"RMS {rms_db:.1f}dB exceeds maximum {AudioValidator.RMS_MAX_DB}dB", + ) + + return ValidationResult(True) + + @staticmethod + def validate_frequency(freq_hz: int, strict: bool = True) -> ValidationResult: + """ + Validate frequency value. + + Args: + freq_hz: Frequency in Hz + strict: If True, use tight range (100-8000Hz), else extended (20-11000Hz) + + Returns: + ValidationResult with valid flag and error message + """ + if not isinstance(freq_hz, int): + return ValidationResult(False, "Frequency must be integer") + + if strict: + min_hz = AudioValidator.FREQ_MIN_HZ + max_hz = AudioValidator.FREQ_MAX_HZ + else: + min_hz = AudioValidator.FREQ_MIN_EXTENDED_HZ + max_hz = AudioValidator.FREQ_MAX_EXTENDED_HZ + + if freq_hz < min_hz: + return ValidationResult( + False, f"Frequency {freq_hz}Hz below minimum {min_hz}Hz" + ) + + if freq_hz > max_hz: + return ValidationResult( + False, f"Frequency {freq_hz}Hz exceeds maximum {max_hz}Hz" + ) + + return ValidationResult(True) + + @staticmethod + def validate_packet(rms_db: float, freq_hz: int) -> ValidationResult: + """ + Validate complete audio packet. + + Args: + rms_db: RMS in dB + freq_hz: Frequency in Hz + + Returns: + ValidationResult with valid flag and error message + """ + rms_result = AudioValidator.validate_rms(rms_db) + if not rms_result.valid: + return rms_result + + freq_result = AudioValidator.validate_frequency(freq_hz, strict=True) + if not freq_result.valid: + return freq_result + + return ValidationResult(True) diff --git a/services/collector/db_writer.py b/services/collector/db_writer.py new file mode 100644 index 0000000..a0c4095 --- /dev/null +++ b/services/collector/db_writer.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +""" +FR-2.3: Database Writer with Batch Processing +Buffers audio metrics and writes in batches (50 records or 5 seconds) +""" + +import asyncio +import logging +from datetime import datetime, timezone +from typing import List, Optional +from dataclasses import dataclass + +import asyncpg + +logger = logging.getLogger(__name__) + + +@dataclass +class AudioRecord: + """Single audio measurement record""" + + timestamp: datetime + rms_db: float + freq_hz: int + is_silence: bool + + +class DatabaseWriter: + """ + Batched database writer for audio metrics. + Flushes on: 50 records OR 5 seconds timeout + """ + + BATCH_SIZE = 50 + BATCH_TIMEOUT = 5.0 # seconds + SILENCE_THRESHOLD_DB = -30.0 # dB below = silence + + def __init__(self, db_url: str): + self.db_url = db_url + self.pool: Optional[asyncpg.Pool] = None + self.buffer: List[AudioRecord] = [] + self.last_flush_time = asyncio.get_event_loop().time() + self._flush_task: Optional[asyncio.Task] = None + self._running = False + + async def connect(self): + """Establish database connection pool""" + try: + self.pool = await asyncpg.create_pool( + self.db_url, min_size=2, max_size=5, command_timeout=10.0 + ) + logger.info("Database connection pool established") + + # Test connection + async with self.pool.acquire() as conn: + await conn.fetchval("SELECT 1") + + except Exception as e: + logger.error(f"Failed to connect to database: {e}") + raise + + async def close(self): + """Close database connection and flush remaining data""" + self._running = False + + if self._flush_task and not self._flush_task.done(): + self._flush_task.cancel() + try: + await self._flush_task + except asyncio.CancelledError: + pass + + await self.flush() + + if self.pool: + await self.pool.close() + logger.info("Database connection closed") + + async def start_auto_flush(self): + """Start background task for timeout-based flushing""" + self._running = True + self._flush_task = asyncio.create_task(self._auto_flush_loop()) + + async def _auto_flush_loop(self): + """Background task: flush buffer every BATCH_TIMEOUT seconds""" + while self._running: + try: + await asyncio.sleep(self.BATCH_TIMEOUT) + + current_time = asyncio.get_event_loop().time() + if self.buffer and ( + current_time - self.last_flush_time >= self.BATCH_TIMEOUT + ): + await self.flush() + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in auto-flush loop: {e}") + + async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int): + """ + Add single record to buffer. Flushes if batch size reached. + + Args: + timestamp_ms: MCU timestamp in milliseconds + rms_db: RMS value in dB + freq_hz: Dominant frequency in Hz + """ + # Convert timestamp to datetime (use current time with ms offset) + # Note: MCU timestamp wraps around, use server time for absolute reference + timestamp = datetime.now(timezone.utc) + + # Detect silence + is_silence = rms_db < self.SILENCE_THRESHOLD_DB + + record = AudioRecord( + timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence + ) + + self.buffer.append(record) + logger.debug( + f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz " + f"silence={is_silence} (buffer={len(self.buffer)})" + ) + + # Flush if batch size reached + if len(self.buffer) >= self.BATCH_SIZE: + await self.flush() + + async def flush(self): + """Write all buffered records to database""" + if not self.buffer: + return + + if not self.pool: + logger.error("Database pool not initialized, cannot flush") + return + + records_to_write = self.buffer[:] + self.buffer.clear() + self.last_flush_time = asyncio.get_event_loop().time() + + try: + async with self.pool.acquire() as conn: + # Prepare batch insert + records = [ + (r.timestamp, r.rms_db, r.freq_hz, r.is_silence) + for r in records_to_write + ] + + await conn.executemany( + """ + INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence) + VALUES ($1, $2, $3, $4) + """, + records, + ) + + logger.info(f"Flushed {len(records)} records to database") + + except Exception as e: + logger.error(f"Failed to write batch to database: {e}") + # Re-add to buffer for retry (optional, could cause memory issues) + # self.buffer.extend(records_to_write) diff --git a/services/collector/main.py b/services/collector/main.py new file mode 100644 index 0000000..cb84782 --- /dev/null +++ b/services/collector/main.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +FR-2: Audio Data Collector Service +Reads audio metrics from STM32, validates, and writes to TimescaleDB +""" + +import asyncio +import logging +import os +import signal +import sys + +from serial_reader import SerialReader +from audio_validator import AudioValidator +from db_writer import DatabaseWriter +from protocol_parser import AudioMetrics + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + +logger = logging.getLogger(__name__) + + +class CollectorService: + """Main collector service orchestrating serial reading and database writing""" + + def __init__(self, serial_port: str, db_url: str, baudrate: int = 115200): + self.serial_reader = SerialReader( + port=serial_port, baudrate=baudrate, on_packet=self._handle_packet + ) + self.db_writer = DatabaseWriter(db_url=db_url) + self.validator = AudioValidator() + + self._shutdown_event = asyncio.Event() + + async def _handle_packet(self, packet: AudioMetrics): + """ + Process received audio packet: validate and write to database. + + Args: + packet: Parsed audio metrics packet + """ + # Validate packet + validation = self.validator.validate_packet(packet.rms_db, packet.freq_hz) + + if not validation.valid: + logger.warning( + f"Invalid packet: {validation.error} " + f"(rms={packet.rms_db:.1f}dB freq={packet.freq_hz}Hz)" + ) + return + + # Write to database + try: + await self.db_writer.add_record( + timestamp_ms=packet.timestamp_ms, + rms_db=packet.rms_db, + freq_hz=packet.freq_hz, + ) + except Exception as e: + logger.error(f"Failed to add record to database: {e}") + + async def start(self): + """Start collector service""" + logger.info("Starting Audio Data Collector Service") + + try: + # Connect to database + await self.db_writer.connect() + await self.db_writer.start_auto_flush() + + # Connect to serial port + await self.serial_reader.connect() + await self.serial_reader.start_reading() + + logger.info("Service started successfully") + + # Wait for shutdown signal + await self._shutdown_event.wait() + + except Exception as e: + logger.error(f"Service startup failed: {e}") + raise + finally: + await self.stop() + + async def stop(self): + """Stop collector service gracefully""" + logger.info("Stopping Audio Data Collector Service") + + # Disconnect serial reader + await self.serial_reader.disconnect() + + # Close database writer (flushes remaining data) + await self.db_writer.close() + + logger.info("Service stopped") + + def shutdown(self): + """Trigger graceful shutdown""" + logger.info("Shutdown requested") + self._shutdown_event.set() + + +def main(): + """Main entry point""" + # Read configuration from environment + SERIAL_PORT = os.getenv("SERIAL_PORT", "/dev/ttyACM0") + BAUDRATE = int(os.getenv("BAUDRATE", "115200")) + DB_HOST = os.getenv("DB_HOST", "localhost") + DB_PORT = os.getenv("DB_PORT", "5432") + DB_NAME = os.getenv("DB_NAME", "audio_analyzer") + DB_USER = os.getenv("DB_USER", "postgres") + DB_PASSWORD = os.getenv("DB_PASSWORD", "postgres") + + db_url = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" + + # Create service + service = CollectorService( + serial_port=SERIAL_PORT, db_url=db_url, baudrate=BAUDRATE + ) + + # Setup signal handlers for graceful shutdown + loop = asyncio.get_event_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, service.shutdown) + + try: + # Run service + loop.run_until_complete(service.start()) + except KeyboardInterrupt: + logger.info("Interrupted by user") + except Exception as e: + logger.error(f"Service error: {e}") + sys.exit(1) + finally: + loop.close() + + +if __name__ == "__main__": + main() diff --git a/services/collector/requirements.txt b/services/collector/requirements.txt new file mode 100644 index 0000000..d07db00 --- /dev/null +++ b/services/collector/requirements.txt @@ -0,0 +1,5 @@ +pyserial +asyncpg +numpy +pytest +pytest-asyncio diff --git a/services/collector/serial_reader.py b/services/collector/serial_reader.py new file mode 100644 index 0000000..8025d16 --- /dev/null +++ b/services/collector/serial_reader.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +""" +FR-2.1: Asynchronous Serial Reader +Reads binary packets from STM32 via USB CDC +""" + +import asyncio +import logging +from typing import Callable, Awaitable + +import serial +from serial import SerialException + +from protocol_parser import ProtocolParser, AudioMetrics + +logger = logging.getLogger(__name__) + + +class SerialReader: + """ + Asynchronous serial port reader with protocol parsing. + """ + + def __init__( + self, + port: str, + baudrate: int = 115200, + on_packet: Callable[[AudioMetrics], Awaitable[None]] = None, + ): + """ + Initialize serial reader. + + Args: + port: Serial port path (e.g., /dev/ttyACM0) + baudrate: Serial baudrate (default 115200) + on_packet: Async callback for each received packet + """ + self.port = port + self.baudrate = baudrate + self.on_packet = on_packet + + self.serial: serial.Serial = None + self.parser = ProtocolParser() + self._running = False + self._read_task: asyncio.Task = None + + async def connect(self): + """Open serial port connection""" + try: + self.serial = serial.Serial( + self.port, self.baudrate, timeout=0.1, write_timeout=1.0 + ) + logger.info(f"Connected to {self.port} @ {self.baudrate} baud") + except SerialException as e: + logger.error(f"Failed to open serial port {self.port}: {e}") + raise + + async def disconnect(self): + """Close serial port connection""" + self._running = False + + if self._read_task and not self._read_task.done(): + self._read_task.cancel() + try: + await self._read_task + except asyncio.CancelledError: + pass + + if self.serial and self.serial.is_open: + self.serial.close() + logger.info("Serial port closed") + + async def start_reading(self): + """Start background task for reading serial data""" + self._running = True + self._read_task = asyncio.create_task(self._read_loop()) + + async def _read_loop(self): + """Background task: continuously read and parse serial data""" + while self._running: + try: + # Read available data (non-blocking due to timeout) + if self.serial.in_waiting > 0: + data = self.serial.read(self.serial.in_waiting or 1) + + if data: + # Parse packets + packets = self.parser.feed(data) + + # Process each packet + for packet in packets: + if self.on_packet: + try: + await self.on_packet(packet) + except Exception as e: + logger.error(f"Error in packet handler: {e}") + + # Log statistics periodically + stats = self.parser.get_stats() + if stats.packets_received % 100 == 0: + logger.info( + f"Stats: RX={stats.packets_received} " + f"CRC_err={stats.crc_errors} " + f"LEN_err={stats.length_errors} " + f"Range_err={stats.range_errors}" + ) + + # Small delay to prevent CPU spinning + await asyncio.sleep(0.01) + + except SerialException as e: + logger.error(f"Serial read error: {e}") + self._running = False + break + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Unexpected error in read loop: {e}") + + def get_stats(self): + """Get protocol parser statistics""" + return self.parser.get_stats() diff --git a/tests/test_device.py b/tests/test_device.py deleted file mode 100644 index e354ea9..0000000 --- a/tests/test_device.py +++ /dev/null @@ -1,40 +0,0 @@ -#!/usr/bin/env python3 -import threading -import time - -import serial - -PORT = "/dev/ttyACM0" -BAUDRATE = 115200 - - -def read_from_port(ser): - while True: - if ser.in_waiting > 0: - data = ser.read(ser.in_waiting) - print(f"Received: {data.decode('utf-8', errors='ignore')}") - - -try: - ser = serial.Serial(PORT, BAUDRATE, timeout=1) - print(f"Connected to {PORT}") - - # Запускаем чтение в фоне - thread = threading.Thread(target=read_from_port, args=(ser,), daemon=True) - thread.start() - - # Пишем данные - counter = 0 - while True: - msg = f"Ping {counter}\n" - ser.write(msg.encode("utf-8")) - print(f"Sent: {msg.strip()}") - counter += 1 - time.sleep(1) - -except serial.SerialException as e: - print(f"Error: {e}") -except KeyboardInterrupt: - print("\nExiting...") - if "ser" in locals() and ser.is_open: - ser.close()