feat(collector): add collector service
This commit is contained in:
165
services/collector/db_writer.py
Normal file
165
services/collector/db_writer.py
Normal file
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FR-2.3: Database Writer with Batch Processing
|
||||
Buffers audio metrics and writes in batches (50 records or 5 seconds)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
import asyncpg
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class AudioRecord:
|
||||
"""Single audio measurement record"""
|
||||
|
||||
timestamp: datetime
|
||||
rms_db: float
|
||||
freq_hz: int
|
||||
is_silence: bool
|
||||
|
||||
|
||||
class DatabaseWriter:
|
||||
"""
|
||||
Batched database writer for audio metrics.
|
||||
Flushes on: 50 records OR 5 seconds timeout
|
||||
"""
|
||||
|
||||
BATCH_SIZE = 50
|
||||
BATCH_TIMEOUT = 5.0 # seconds
|
||||
SILENCE_THRESHOLD_DB = -30.0 # dB below = silence
|
||||
|
||||
def __init__(self, db_url: str):
|
||||
self.db_url = db_url
|
||||
self.pool: Optional[asyncpg.Pool] = None
|
||||
self.buffer: List[AudioRecord] = []
|
||||
self.last_flush_time = asyncio.get_event_loop().time()
|
||||
self._flush_task: Optional[asyncio.Task] = None
|
||||
self._running = False
|
||||
|
||||
async def connect(self):
|
||||
"""Establish database connection pool"""
|
||||
try:
|
||||
self.pool = await asyncpg.create_pool(
|
||||
self.db_url, min_size=2, max_size=5, command_timeout=10.0
|
||||
)
|
||||
logger.info("Database connection pool established")
|
||||
|
||||
# Test connection
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.fetchval("SELECT 1")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to database: {e}")
|
||||
raise
|
||||
|
||||
async def close(self):
|
||||
"""Close database connection and flush remaining data"""
|
||||
self._running = False
|
||||
|
||||
if self._flush_task and not self._flush_task.done():
|
||||
self._flush_task.cancel()
|
||||
try:
|
||||
await self._flush_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
await self.flush()
|
||||
|
||||
if self.pool:
|
||||
await self.pool.close()
|
||||
logger.info("Database connection closed")
|
||||
|
||||
async def start_auto_flush(self):
|
||||
"""Start background task for timeout-based flushing"""
|
||||
self._running = True
|
||||
self._flush_task = asyncio.create_task(self._auto_flush_loop())
|
||||
|
||||
async def _auto_flush_loop(self):
|
||||
"""Background task: flush buffer every BATCH_TIMEOUT seconds"""
|
||||
while self._running:
|
||||
try:
|
||||
await asyncio.sleep(self.BATCH_TIMEOUT)
|
||||
|
||||
current_time = asyncio.get_event_loop().time()
|
||||
if self.buffer and (
|
||||
current_time - self.last_flush_time >= self.BATCH_TIMEOUT
|
||||
):
|
||||
await self.flush()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in auto-flush loop: {e}")
|
||||
|
||||
async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int):
|
||||
"""
|
||||
Add single record to buffer. Flushes if batch size reached.
|
||||
|
||||
Args:
|
||||
timestamp_ms: MCU timestamp in milliseconds
|
||||
rms_db: RMS value in dB
|
||||
freq_hz: Dominant frequency in Hz
|
||||
"""
|
||||
# Convert timestamp to datetime (use current time with ms offset)
|
||||
# Note: MCU timestamp wraps around, use server time for absolute reference
|
||||
timestamp = datetime.now(timezone.utc)
|
||||
|
||||
# Detect silence
|
||||
is_silence = rms_db < self.SILENCE_THRESHOLD_DB
|
||||
|
||||
record = AudioRecord(
|
||||
timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence
|
||||
)
|
||||
|
||||
self.buffer.append(record)
|
||||
logger.debug(
|
||||
f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz "
|
||||
f"silence={is_silence} (buffer={len(self.buffer)})"
|
||||
)
|
||||
|
||||
# Flush if batch size reached
|
||||
if len(self.buffer) >= self.BATCH_SIZE:
|
||||
await self.flush()
|
||||
|
||||
async def flush(self):
|
||||
"""Write all buffered records to database"""
|
||||
if not self.buffer:
|
||||
return
|
||||
|
||||
if not self.pool:
|
||||
logger.error("Database pool not initialized, cannot flush")
|
||||
return
|
||||
|
||||
records_to_write = self.buffer[:]
|
||||
self.buffer.clear()
|
||||
self.last_flush_time = asyncio.get_event_loop().time()
|
||||
|
||||
try:
|
||||
async with self.pool.acquire() as conn:
|
||||
# Prepare batch insert
|
||||
records = [
|
||||
(r.timestamp, r.rms_db, r.freq_hz, r.is_silence)
|
||||
for r in records_to_write
|
||||
]
|
||||
|
||||
await conn.executemany(
|
||||
"""
|
||||
INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
""",
|
||||
records,
|
||||
)
|
||||
|
||||
logger.info(f"Flushed {len(records)} records to database")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to write batch to database: {e}")
|
||||
# Re-add to buffer for retry (optional, could cause memory issues)
|
||||
# self.buffer.extend(records_to_write)
|
||||
Reference in New Issue
Block a user