#!/usr/bin/env python3 """ FR-2.3: Database Writer with Batch Processing Buffers audio metrics and writes in batches """ import asyncio import logging from datetime import datetime, timezone from typing import List, Optional, final from dataclasses import dataclass import asyncpg logger = logging.getLogger(__name__) @dataclass class AudioRecord: """Single audio measurement record""" timestamp: datetime rms_db: float freq_hz: int is_silence: bool class DatabaseWriter: """ Batched database writer for audio metrics. Flushes on: 50 records OR 5 seconds timeout """ BATCH_SIZE = 50 BATCH_TIMEOUT: float = 5.0 # seconds SILENCE_THRESHOLD_DB: float = -30.0 # dB below = silence def __init__(self, db_url: str): self.db_url = db_url self.pool: Optional[asyncpg.Pool] = None self.buffer: List[AudioRecord] = [] self.last_flush_time = asyncio.get_event_loop().time() self._flush_task: Optional[asyncio.Task] = None self._running = False async def connect(self): """Establish database connection pool""" try: self.pool = await asyncpg.create_pool( self.db_url, min_size=2, max_size=5, command_timeout=10.0 ) logger.info("Database connection pool established") # Test connection async with self.pool.acquire() as conn: await conn.fetchval("SELECT 1") except Exception as e: logger.error(f"Failed to connect to database: {e}") raise async def close(self): """Close database connection and flush remaining data""" self._running = False if self._flush_task and not self._flush_task.done(): self._flush_task.cancel() try: await self._flush_task except asyncio.CancelledError: pass await self.flush() if self.pool: await self.pool.close() logger.info("Database connection closed") async def start_auto_flush(self): """Start background task for timeout-based flushing""" self._running = True self._flush_task = asyncio.create_task(self._auto_flush_loop()) async def _auto_flush_loop(self): """Background task: flush buffer every BATCH_TIMEOUT seconds""" while self._running: try: await asyncio.sleep(self.BATCH_TIMEOUT) current_time = asyncio.get_event_loop().time() if self.buffer and ( current_time - self.last_flush_time >= self.BATCH_TIMEOUT ): await self.flush() except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in auto-flush loop: {e}") async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int): """ Add single record to buffer. Flushes if batch size reached. Args: timestamp_ms: MCU timestamp in milliseconds rms_db: RMS value in dB freq_hz: Dominant frequency in Hz """ # Convert timestamp to datetime (use current time with ms offset) # Note: MCU timestamp wraps around, use server time for absolute reference timestamp = datetime.now(timezone.utc) # Detect silence is_silence = rms_db < self.SILENCE_THRESHOLD_DB record = AudioRecord( timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence ) self.buffer.append(record) logger.debug( f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz " f"silence={is_silence} (buffer={len(self.buffer)})" ) if len(self.buffer) >= self.BATCH_SIZE: await self.flush() async def flush(self): """Write all buffered records to database""" if not self.buffer: return if not self.pool: logger.error("Database pool not initialized, cannot flush") return records_to_write = self.buffer[:] self.buffer.clear() self.last_flush_time = asyncio.get_event_loop().time() try: async with self.pool.acquire() as conn: # Prepare batch insert records = [ (r.timestamp, r.rms_db, r.freq_hz, r.is_silence) for r in records_to_write ] await conn.executemany( """ INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence) VALUES ($1, $2, $3, $4) """, records, ) logger.info(f"Flushed {len(records)} records to database") except Exception as e: logger.error(f"Failed to write batch to database: {e}") # Re-add to buffer for retry (optional, could cause memory issues) # self.buffer.extend(records_to_write)