import { create } from "zustand"; import type { AudioSample } from "../../../entities/audioSample/model/types"; import { env } from "../../../shared/config/env"; import { RingBuffer } from "../../../shared/lib/ringBuffer"; import type { WsStatus } from "./types"; import { LiveWsClient } from "../lib/liveWsClient"; import { parseAndValidateMessage } from "../lib/parseAndValidate"; type LiveStreamState = { status: WsStatus; lastMessageAt: number | null; // WS frequency control requestedHz: number; // 0.1-60 setRequestedHz: (hz: number) => void; // Window selection windowMs: number; setWindowMs: (ms: number) => void; // Derived UI state (throttled) latest: AudioSample | null; peakHoldDb3s: number | null; chartHistory: AudioSample[]; connect: () => void; disconnect: () => void; loadLatest: (limit?: number) => Promise; }; const PEAK_WINDOW_MS = 3_000; const WINDOW_OPTIONS_MS = [15_000, 30_000, 60_000, 120_000, 300_000] as const; const DEFAULT_WINDOW_MS = 60_000; const MIN_HZ = 0.1; // const MAX_HZ = 60; const DEFAULT_REQUESTED_HZ = 10; // If there are more than 600 points in selected window -> downsample const MAX_CHART_POINTS = 600; // UI updates not more than ~12 Hz const UI_FLUSH_HZ = 12; const UI_FLUSH_MS = Math.round(1000 / UI_FLUSH_HZ); // Max window is 5m; worst-case 60 Hz => 18k points (+ headroom) const RAW_CAPACITY = 20_000; // -------- Module-level raw buffers (NOT in Zustand state) -------- const rawHistory = new RingBuffer(RAW_CAPACITY); let peakWindow: AudioSample[] = []; let client: LiveWsClient | null = null; let flushTimer: number | null = null; let lastSeenSample: AudioSample | null = null; function clampInt(v: number, min: number): number { if (!Number.isFinite(v)) return min; return Math.max(min, v); } function isAllowedWindowMs( ms: number, ): ms is (typeof WINDOW_OPTIONS_MS)[number] { return (WINDOW_OPTIONS_MS as readonly number[]).includes(ms); } function buildWsUrl(base: string, hz: number): string { const safeHz = clampInt(hz, MIN_HZ); // Prefer URL() for correctness (keeps existing params) try { const u = new URL(base); u.searchParams.set("hz", String(safeHz)); return u.toString(); } catch { const sep = base.includes("?") ? "&" : "?"; return `${base}${sep}hz=${encodeURIComponent(String(safeHz))}`; } } function trimPeakWindow(nowSampleMs: number): void { const cutoff = nowSampleMs - PEAK_WINDOW_MS; while (peakWindow.length && peakWindow[0]!.timeMs < cutoff) peakWindow.shift(); // Safety cap: even at 60 Hz, 3 sec ~ 180 points; allow some jitter if (peakWindow.length > 512) peakWindow = peakWindow.slice(-512); } function computePeakDb3s(): number | null { if (!peakWindow.length) return null; let max = -Infinity; for (const s of peakWindow) max = Math.max(max, s.rms_db); return Number.isFinite(max) ? max : null; } function makeChartHistory(windowMs: number): AudioSample[] { const latest = lastSeenSample ?? rawHistory.last(); if (!latest) return []; const cutoff = latest.timeMs - windowMs; // Note: rawHistory.toArray() allocates, but it's called at ~12 Hz, not per WS packet const windowed = rawHistory.toArray().filter((s) => s.timeMs >= cutoff); if (windowed.length <= MAX_CHART_POINTS) return windowed; const step = Math.ceil(windowed.length / MAX_CHART_POINTS); const out: AudioSample[] = []; for (let i = 0; i < windowed.length; i += step) out.push(windowed[i]!); // Ensure last point is present (prevents “missing tail” effect) const last = windowed.at(-1); if (last && out.at(-1)?.timeMs !== last.timeMs) out.push(last); return out; } export const useLiveStreamStore = create()((set, get) => { function clearFlushTimer(): void { if (flushTimer !== null) { window.clearTimeout(flushTimer); flushTimer = null; } } function flushToUi(): void { clearFlushTimer(); const { windowMs } = get(); const latest = lastSeenSample ?? rawHistory.last(); const chartHistory = makeChartHistory(windowMs); set({ latest: latest ?? null, peakHoldDb3s: computePeakDb3s(), chartHistory, lastMessageAt: Date.now(), }); } function scheduleFlush(): void { if (flushTimer !== null) return; flushTimer = window.setTimeout(flushToUi, UI_FLUSH_MS); } function ensureClientConnected(): void { if (client) return; const hz = get().requestedHz; const url = buildWsUrl(env.wsUrl, hz); client = new LiveWsClient(url, { onStatus: (st) => set({ status: st }), onMessage: (data) => { const parsed = parseAndValidateMessage(data); if (!parsed.ok) return; const sample = parsed.sample; lastSeenSample = sample; rawHistory.push(sample); peakWindow.push(sample); trimPeakWindow(sample.timeMs); // Throttled UI update scheduleFlush(); }, }); client.connect(); } function reconnectWithNewHz(): void { if (!client) return; client.close(); client = null; ensureClientConnected(); } return { status: "disconnected", lastMessageAt: null, requestedHz: DEFAULT_REQUESTED_HZ, setRequestedHz: (hz) => { const next = clampInt(hz, MIN_HZ); const prev = get().requestedHz; if (next === prev) return; set({ requestedHz: next }); reconnectWithNewHz(); }, windowMs: DEFAULT_WINDOW_MS, setWindowMs: (ms) => { const next = isAllowedWindowMs(ms) ? ms : DEFAULT_WINDOW_MS; if (next === get().windowMs) return; set({ windowMs: next }); // Recompute immediately (doesn't touch WS) flushToUi(); }, latest: null, peakHoldDb3s: null, chartHistory: [], connect: () => { ensureClientConnected(); }, disconnect: () => { clearFlushTimer(); client?.close(); client = null; set({ status: "disconnected" }); // Raw history intentionally preserved to avoid “jumping” on reconnect }, loadLatest: async (limit = 300) => { const safeLimit = clampInt(limit, 1); const base = env.apiUrl.replace(/\/$/, ""); const url = `${base}/api/v1/audio/latest?limit=${safeLimit}`; try { const res = await fetch(url); if (!res.ok) return; const raw = (await res.json()) as Array<{ time: string; rms_db: number; freq_hz: number; }>; // Push historical points into raw buffer (no per-item setState!) for (const item of raw) { const parsed = parseAndValidateMessage(JSON.stringify(item)); if (!parsed.ok) continue; rawHistory.push(parsed.sample); lastSeenSample = parsed.sample; // Warm-up peak window too (optional but consistent) peakWindow.push(parsed.sample); trimPeakWindow(parsed.sample.timeMs); } // Single UI update after warm-up flushToUi(); } catch { // graceful fallback: ignore (dashboard must stay usable without REST) } }, }; });