Files
sound-analyze/services/collector/db_writer.py

166 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
FR-2.3: Database Writer with Batch Processing
Buffers audio metrics and writes in batches (50 records or 5 seconds)
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import List, Optional
from dataclasses import dataclass
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class AudioRecord:
"""Single audio measurement record"""
timestamp: datetime
rms_db: float
freq_hz: int
is_silence: bool
class DatabaseWriter:
"""
Batched database writer for audio metrics.
Flushes on: 50 records OR 5 seconds timeout
"""
BATCH_SIZE = 50
BATCH_TIMEOUT = 5.0 # seconds
SILENCE_THRESHOLD_DB = -30.0 # dB below = silence
def __init__(self, db_url: str):
self.db_url = db_url
self.pool: Optional[asyncpg.Pool] = None
self.buffer: List[AudioRecord] = []
self.last_flush_time = asyncio.get_event_loop().time()
self._flush_task: Optional[asyncio.Task] = None
self._running = False
async def connect(self):
"""Establish database connection pool"""
try:
self.pool = await asyncpg.create_pool(
self.db_url, min_size=2, max_size=5, command_timeout=10.0
)
logger.info("Database connection pool established")
# Test connection
async with self.pool.acquire() as conn:
await conn.fetchval("SELECT 1")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
async def close(self):
"""Close database connection and flush remaining data"""
self._running = False
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
await self.flush()
if self.pool:
await self.pool.close()
logger.info("Database connection closed")
async def start_auto_flush(self):
"""Start background task for timeout-based flushing"""
self._running = True
self._flush_task = asyncio.create_task(self._auto_flush_loop())
async def _auto_flush_loop(self):
"""Background task: flush buffer every BATCH_TIMEOUT seconds"""
while self._running:
try:
await asyncio.sleep(self.BATCH_TIMEOUT)
current_time = asyncio.get_event_loop().time()
if self.buffer and (
current_time - self.last_flush_time >= self.BATCH_TIMEOUT
):
await self.flush()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in auto-flush loop: {e}")
async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int):
"""
Add single record to buffer. Flushes if batch size reached.
Args:
timestamp_ms: MCU timestamp in milliseconds
rms_db: RMS value in dB
freq_hz: Dominant frequency in Hz
"""
# Convert timestamp to datetime (use current time with ms offset)
# Note: MCU timestamp wraps around, use server time for absolute reference
timestamp = datetime.now(timezone.utc)
# Detect silence
is_silence = rms_db < self.SILENCE_THRESHOLD_DB
record = AudioRecord(
timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence
)
self.buffer.append(record)
logger.debug(
f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz "
f"silence={is_silence} (buffer={len(self.buffer)})"
)
# Flush if batch size reached
if len(self.buffer) >= self.BATCH_SIZE:
await self.flush()
async def flush(self):
"""Write all buffered records to database"""
if not self.buffer:
return
if not self.pool:
logger.error("Database pool not initialized, cannot flush")
return
records_to_write = self.buffer[:]
self.buffer.clear()
self.last_flush_time = asyncio.get_event_loop().time()
try:
async with self.pool.acquire() as conn:
# Prepare batch insert
records = [
(r.timestamp, r.rms_db, r.freq_hz, r.is_silence)
for r in records_to_write
]
await conn.executemany(
"""
INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence)
VALUES ($1, $2, $3, $4)
""",
records,
)
logger.info(f"Flushed {len(records)} records to database")
except Exception as e:
logger.error(f"Failed to write batch to database: {e}")
# Re-add to buffer for retry (optional, could cause memory issues)
# self.buffer.extend(records_to_write)