165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
FR-2.3: Database Writer with Batch Processing
|
|
Buffers audio metrics and writes in batches
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, final
|
|
from dataclasses import dataclass
|
|
|
|
import asyncpg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class AudioRecord:
|
|
"""Single audio measurement record"""
|
|
|
|
timestamp: datetime
|
|
rms_db: float
|
|
freq_hz: int
|
|
is_silence: bool
|
|
|
|
|
|
class DatabaseWriter:
|
|
"""
|
|
Batched database writer for audio metrics.
|
|
Flushes on: 50 records OR 5 seconds timeout
|
|
"""
|
|
|
|
BATCH_SIZE = 50
|
|
BATCH_TIMEOUT: float = 5.0 # seconds
|
|
SILENCE_THRESHOLD_DB: float = -30.0 # dB below = silence
|
|
|
|
def __init__(self, db_url: str):
|
|
self.db_url = db_url
|
|
self.pool: Optional[asyncpg.Pool] = None
|
|
self.buffer: List[AudioRecord] = []
|
|
self.last_flush_time = asyncio.get_event_loop().time()
|
|
self._flush_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
|
|
async def connect(self):
|
|
"""Establish database connection pool"""
|
|
try:
|
|
self.pool = await asyncpg.create_pool(
|
|
self.db_url, min_size=2, max_size=5, command_timeout=10.0
|
|
)
|
|
logger.info("Database connection pool established")
|
|
|
|
# Test connection
|
|
async with self.pool.acquire() as conn:
|
|
await conn.fetchval("SELECT 1")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to database: {e}")
|
|
raise
|
|
|
|
async def close(self):
|
|
"""Close database connection and flush remaining data"""
|
|
self._running = False
|
|
|
|
if self._flush_task and not self._flush_task.done():
|
|
self._flush_task.cancel()
|
|
try:
|
|
await self._flush_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
await self.flush()
|
|
|
|
if self.pool:
|
|
await self.pool.close()
|
|
logger.info("Database connection closed")
|
|
|
|
async def start_auto_flush(self):
|
|
"""Start background task for timeout-based flushing"""
|
|
self._running = True
|
|
self._flush_task = asyncio.create_task(self._auto_flush_loop())
|
|
|
|
async def _auto_flush_loop(self):
|
|
"""Background task: flush buffer every BATCH_TIMEOUT seconds"""
|
|
while self._running:
|
|
try:
|
|
await asyncio.sleep(self.BATCH_TIMEOUT)
|
|
|
|
current_time = asyncio.get_event_loop().time()
|
|
if self.buffer and (
|
|
current_time - self.last_flush_time >= self.BATCH_TIMEOUT
|
|
):
|
|
await self.flush()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in auto-flush loop: {e}")
|
|
|
|
async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int):
|
|
"""
|
|
Add single record to buffer. Flushes if batch size reached.
|
|
|
|
Args:
|
|
timestamp_ms: MCU timestamp in milliseconds
|
|
rms_db: RMS value in dB
|
|
freq_hz: Dominant frequency in Hz
|
|
"""
|
|
# Convert timestamp to datetime (use current time with ms offset)
|
|
# Note: MCU timestamp wraps around, use server time for absolute reference
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
# Detect silence
|
|
is_silence = rms_db < self.SILENCE_THRESHOLD_DB
|
|
|
|
record = AudioRecord(
|
|
timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence
|
|
)
|
|
|
|
self.buffer.append(record)
|
|
logger.debug(
|
|
f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz "
|
|
f"silence={is_silence} (buffer={len(self.buffer)})"
|
|
)
|
|
|
|
if len(self.buffer) >= self.BATCH_SIZE:
|
|
await self.flush()
|
|
|
|
async def flush(self):
|
|
"""Write all buffered records to database"""
|
|
if not self.buffer:
|
|
return
|
|
|
|
if not self.pool:
|
|
logger.error("Database pool not initialized, cannot flush")
|
|
return
|
|
|
|
records_to_write = self.buffer[:]
|
|
self.buffer.clear()
|
|
self.last_flush_time = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
async with self.pool.acquire() as conn:
|
|
# Prepare batch insert
|
|
records = [
|
|
(r.timestamp, r.rms_db, r.freq_hz, r.is_silence)
|
|
for r in records_to_write
|
|
]
|
|
|
|
await conn.executemany(
|
|
"""
|
|
INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence)
|
|
VALUES ($1, $2, $3, $4)
|
|
""",
|
|
records,
|
|
)
|
|
|
|
logger.info(f"Flushed {len(records)} records to database")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to write batch to database: {e}")
|
|
# Re-add to buffer for retry (optional, could cause memory issues)
|
|
# self.buffer.extend(records_to_write)
|