feat(frontend): add websocket send speed packet control

This commit is contained in:
2025-12-28 22:25:14 +03:00
parent e6f361def4
commit c560b9be76
3 changed files with 285 additions and 75 deletions

View File

@@ -1,6 +1,7 @@
import { create } from "zustand";
import type { AudioSample } from "../../../entities/audioSample/model/types";
import { env } from "../../../shared/config/env";
import { RingBuffer } from "../../../shared/lib/ringBuffer";
import type { WsStatus } from "./types";
import { LiveWsClient } from "../lib/liveWsClient";
import { parseAndValidateMessage } from "../lib/parseAndValidate";
@@ -9,114 +10,249 @@ type LiveStreamState = {
status: WsStatus;
lastMessageAt: number | null;
latest: AudioSample | null;
// WS frequency control
requestedHz: number; // 1..60
setRequestedHz: (hz: number) => void;
history60s: AudioSample[];
// Window selection
windowMs: number;
setWindowMs: (ms: number) => void;
// Derived UI state (throttled)
latest: AudioSample | null;
peakHoldDb3s: number | null;
chartHistory: AudioSample[];
connect: () => void;
disconnect: () => void;
loadLatest: (limit?: number) => Promise<void>;
};
const WINDOW_MS = 60_000;
const PEAK_WINDOW_MS = 3_000;
// safety cap для worst-case 60Hz: ~3600 точек за 60 секунд
// (нужен на случай странных таймстампов/скачков времени)
const MAX_POINTS_60S = 60 * 60;
const WINDOW_OPTIONS_MS = [15_000, 30_000, 60_000, 120_000, 300_000] as const;
const DEFAULT_WINDOW_MS = 60_000;
let historyWindow: AudioSample[] = [];
const MIN_HZ = 1;
const MAX_HZ = 60;
const DEFAULT_REQUESTED_HZ = 10;
// If there are more than 600 points in selected window -> downsample
const MAX_CHART_POINTS = 600;
// UI updates not more than ~12 Hz (1015 Hz recommended)
const UI_FLUSH_HZ = 12;
const UI_FLUSH_MS = Math.round(1000 / UI_FLUSH_HZ);
// Max window is 5m; worst-case 60 Hz => 18k points (+ headroom)
const RAW_CAPACITY = 20_000;
// -------- Module-level raw buffers (NOT in Zustand state) --------
const rawHistory = new RingBuffer<AudioSample>(RAW_CAPACITY);
let peakWindow: AudioSample[] = [];
let client: LiveWsClient | null = null;
function trimByTime(arr: AudioSample[], cutoffMs: number): AudioSample[] {
// shift в цикле ок для ~3600 элементов; если захочешь — можно оптимизировать индексом
while (arr.length && arr[0]!.timeMs < cutoffMs) arr.shift();
if (arr.length > MAX_POINTS_60S) arr = arr.slice(-MAX_POINTS_60S);
return arr;
let client: LiveWsClient | null = null;
let flushTimer: number | null = null;
let lastSeenSample: AudioSample | null = null;
function clampInt(v: number, min: number, max: number): number {
if (!Number.isFinite(v)) return min;
return Math.max(min, Math.min(max, Math.trunc(v)));
}
function computePeakDb(windowSamples: AudioSample[]): number | null {
if (windowSamples.length === 0) return null;
function isAllowedWindowMs(ms: number): ms is (typeof WINDOW_OPTIONS_MS)[number] {
return (WINDOW_OPTIONS_MS as readonly number[]).includes(ms);
}
function buildWsUrl(base: string, hz: number): string {
const safeHz = clampInt(hz, MIN_HZ, MAX_HZ);
// Prefer URL() for correctness (keeps existing params)
try {
const u = new URL(base);
u.searchParams.set("hz", String(safeHz));
return u.toString();
} catch {
const sep = base.includes("?") ? "&" : "?";
return `${base}${sep}hz=${encodeURIComponent(String(safeHz))}`;
}
}
function trimPeakWindow(nowSampleMs: number): void {
const cutoff = nowSampleMs - PEAK_WINDOW_MS;
while (peakWindow.length && peakWindow[0]!.timeMs < cutoff) peakWindow.shift();
// Safety cap: even at 60 Hz, 3 sec ~ 180 points; allow some jitter
if (peakWindow.length > 512) peakWindow = peakWindow.slice(-512);
}
function computePeakDb3s(): number | null {
if (!peakWindow.length) return null;
let max = -Infinity;
for (const s of windowSamples) max = Math.max(max, s.rms_db);
for (const s of peakWindow) max = Math.max(max, s.rms_db);
return Number.isFinite(max) ? max : null;
}
export const useLiveStreamStore = create<LiveStreamState>()((set) => ({
status: "disconnected",
lastMessageAt: null,
function makeChartHistory(windowMs: number): AudioSample[] {
const latest = lastSeenSample ?? rawHistory.last();
if (!latest) return [];
latest: null,
history60s: [],
peakHoldDb3s: null,
const cutoff = latest.timeMs - windowMs;
connect: () => {
// Note: rawHistory.toArray() allocates, but it's called at ~12 Hz, not per WS packet
const windowed = rawHistory.toArray().filter((s) => s.timeMs >= cutoff);
if (windowed.length <= MAX_CHART_POINTS) return windowed;
const step = Math.ceil(windowed.length / MAX_CHART_POINTS);
const out: AudioSample[] = [];
for (let i = 0; i < windowed.length; i += step) out.push(windowed[i]!);
// Ensure last point is present (prevents “missing tail” effect)
const last = windowed.at(-1);
if (last && out.at(-1)?.timeMs !== last.timeMs) out.push(last);
return out;
}
export const useLiveStreamStore = create<LiveStreamState>()((set, get) => {
function clearFlushTimer(): void {
if (flushTimer !== null) {
window.clearTimeout(flushTimer);
flushTimer = null;
}
}
function flushToUi(): void {
clearFlushTimer();
const { windowMs } = get();
const latest = lastSeenSample ?? rawHistory.last();
const chartHistory = makeChartHistory(windowMs);
set({
latest: latest ?? null,
peakHoldDb3s: computePeakDb3s(),
chartHistory,
lastMessageAt: Date.now(),
});
}
function scheduleFlush(): void {
if (flushTimer !== null) return;
flushTimer = window.setTimeout(flushToUi, UI_FLUSH_MS);
}
function ensureClientConnected(): void {
if (client) return;
client = new LiveWsClient(env.wsUrl, {
const hz = get().requestedHz;
const url = buildWsUrl(env.wsUrl, hz);
client = new LiveWsClient(url, {
onStatus: (st) => set({ status: st }),
onMessage: (data) => {
const parsed = parseAndValidateMessage(data);
if (!parsed.ok) return;
const sample = parsed.sample;
lastSeenSample = sample;
// 60s time window (по времени, а не по "600 точек")
historyWindow.push(sample);
historyWindow = trimByTime(historyWindow, sample.timeMs - WINDOW_MS);
rawHistory.push(sample);
// 3s peak-hold window (тоже по времени сэмпла)
peakWindow.push(sample);
peakWindow = trimByTime(peakWindow, sample.timeMs - PEAK_WINDOW_MS);
trimPeakWindow(sample.timeMs);
set({
latest: sample,
history60s: historyWindow.slice(), // копия для UI
peakHoldDb3s: computePeakDb(peakWindow),
lastMessageAt: Date.now(),
});
// Throttled UI update (important for Recharts + overall UI smoothness)
scheduleFlush();
},
});
client.connect();
},
}
disconnect: () => {
client?.close();
function reconnectWithNewHz(): void {
if (!client) return;
client.close();
client = null;
peakWindow = [];
// historyWindow оставляем: график не должен "прыгать" при reconnect
set({ status: "disconnected" });
},
ensureClientConnected();
}
loadLatest: async (limit = 100) => {
const url = `${env.apiUrl.replace(/\/$/, "")}/api/v1/audio/latest?limit=${limit}`;
try {
const res = await fetch(url);
if (!res.ok) return;
return {
status: "disconnected",
lastMessageAt: null,
const raw = (await res.json()) as Array<{
time: string;
rms_db: number;
freq_hz: number;
}>;
requestedHz: DEFAULT_REQUESTED_HZ,
setRequestedHz: (hz) => {
const next = clampInt(hz, MIN_HZ, MAX_HZ);
const prev = get().requestedHz;
if (next === prev) return;
for (const item of raw) {
const parsed = parseAndValidateMessage(JSON.stringify(item));
if (!parsed.ok) continue;
historyWindow.push(parsed.sample);
set({ requestedHz: next });
reconnectWithNewHz();
},
windowMs: DEFAULT_WINDOW_MS,
setWindowMs: (ms) => {
const next = isAllowedWindowMs(ms) ? ms : DEFAULT_WINDOW_MS;
if (next === get().windowMs) return;
set({ windowMs: next });
// Recompute immediately (doesn't touch WS)
flushToUi();
},
latest: null,
peakHoldDb3s: null,
chartHistory: [],
connect: () => {
ensureClientConnected();
},
disconnect: () => {
clearFlushTimer();
client?.close();
client = null;
set({ status: "disconnected" });
// Raw history intentionally preserved to avoid “jumping” on reconnect
},
loadLatest: async (limit = 300) => {
const safeLimit = clampInt(limit, 1, 5000);
const base = env.apiUrl.replace(/\/$/, "");
const url = `${base}/api/v1/audio/latest?limit=${safeLimit}`;
try {
const res = await fetch(url);
if (!res.ok) return;
const raw = (await res.json()) as Array<{
time: string;
rms_db: number;
freq_hz: number;
}>;
// Push historical points into raw buffer (no per-item setState!)
for (const item of raw) {
const parsed = parseAndValidateMessage(JSON.stringify(item));
if (!parsed.ok) continue;
rawHistory.push(parsed.sample);
lastSeenSample = parsed.sample;
// Warm-up peak window too (optional but consistent)
peakWindow.push(parsed.sample);
trimPeakWindow(parsed.sample.timeMs);
}
// Single UI update after warm-up
flushToUi();
} catch {
// graceful fallback: ignore (dashboard must stay usable without REST)
}
},
};
});
// после warm-up тоже режем по времени "последней точки"
const last = historyWindow.at(-1);
if (last)
historyWindow = trimByTime(historyWindow, last.timeMs - WINDOW_MS);
set({ history60s: historyWindow.slice() });
} catch {
// ignore
}
},
}));