Files
sound-analyze/services/collector/db_writer.py
2025-12-28 22:25:45 +03:00

165 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""
FR-2.3: Database Writer with Batch Processing
Buffers audio metrics and writes in batches
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import List, Optional, final
from dataclasses import dataclass
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class AudioRecord:
"""Single audio measurement record"""
timestamp: datetime
rms_db: float
freq_hz: int
is_silence: bool
class DatabaseWriter:
"""
Batched database writer for audio metrics.
Flushes on: 50 records OR 5 seconds timeout
"""
BATCH_SIZE = 50
BATCH_TIMEOUT: float = 5.0 # seconds
SILENCE_THRESHOLD_DB: float = -30.0 # dB below = silence
def __init__(self, db_url: str):
self.db_url = db_url
self.pool: Optional[asyncpg.Pool] = None
self.buffer: List[AudioRecord] = []
self.last_flush_time = asyncio.get_event_loop().time()
self._flush_task: Optional[asyncio.Task] = None
self._running = False
async def connect(self):
"""Establish database connection pool"""
try:
self.pool = await asyncpg.create_pool(
self.db_url, min_size=2, max_size=5, command_timeout=10.0
)
logger.info("Database connection pool established")
# Test connection
async with self.pool.acquire() as conn:
await conn.fetchval("SELECT 1")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
async def close(self):
"""Close database connection and flush remaining data"""
self._running = False
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
await self.flush()
if self.pool:
await self.pool.close()
logger.info("Database connection closed")
async def start_auto_flush(self):
"""Start background task for timeout-based flushing"""
self._running = True
self._flush_task = asyncio.create_task(self._auto_flush_loop())
async def _auto_flush_loop(self):
"""Background task: flush buffer every BATCH_TIMEOUT seconds"""
while self._running:
try:
await asyncio.sleep(self.BATCH_TIMEOUT)
current_time = asyncio.get_event_loop().time()
if self.buffer and (
current_time - self.last_flush_time >= self.BATCH_TIMEOUT
):
await self.flush()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in auto-flush loop: {e}")
async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int):
"""
Add single record to buffer. Flushes if batch size reached.
Args:
timestamp_ms: MCU timestamp in milliseconds
rms_db: RMS value in dB
freq_hz: Dominant frequency in Hz
"""
# Convert timestamp to datetime (use current time with ms offset)
# Note: MCU timestamp wraps around, use server time for absolute reference
timestamp = datetime.now(timezone.utc)
# Detect silence
is_silence = rms_db < self.SILENCE_THRESHOLD_DB
record = AudioRecord(
timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence
)
self.buffer.append(record)
logger.debug(
f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz "
f"silence={is_silence} (buffer={len(self.buffer)})"
)
if len(self.buffer) >= self.BATCH_SIZE:
await self.flush()
async def flush(self):
"""Write all buffered records to database"""
if not self.buffer:
return
if not self.pool:
logger.error("Database pool not initialized, cannot flush")
return
records_to_write = self.buffer[:]
self.buffer.clear()
self.last_flush_time = asyncio.get_event_loop().time()
try:
async with self.pool.acquire() as conn:
# Prepare batch insert
records = [
(r.timestamp, r.rms_db, r.freq_hz, r.is_silence)
for r in records_to_write
]
await conn.executemany(
"""
INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence)
VALUES ($1, $2, $3, $4)
""",
records,
)
logger.info(f"Flushed {len(records)} records to database")
except Exception as e:
logger.error(f"Failed to write batch to database: {e}")
# Re-add to buffer for retry (optional, could cause memory issues)
# self.buffer.extend(records_to_write)