66 lines
1.8 KiB
Python
66 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from fastapi import WebSocket
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ClientConn:
|
|
ws: WebSocket
|
|
hz: int
|
|
min_interval: float
|
|
last_sent_monotonic: float
|
|
|
|
|
|
class ConnectionManager:
|
|
def __init__(self) -> None:
|
|
self._conns: dict[WebSocket, ClientConn] = {}
|
|
self._lock = asyncio.Lock()
|
|
|
|
async def connect(self, ws: WebSocket, hz: int) -> None:
|
|
await ws.accept()
|
|
now = time.monotonic()
|
|
client = ClientConn(
|
|
ws=ws, hz=hz, min_interval=1.0 / hz, last_sent_monotonic=0.0
|
|
)
|
|
|
|
async with self._lock:
|
|
self._conns[ws] = client
|
|
|
|
# Небольшой лог (можно заменить на structlog/loguru)
|
|
print(f"[ws] connected client={id(ws)} hz={hz} at={now:.3f}")
|
|
|
|
async def disconnect(self, ws: WebSocket) -> None:
|
|
async with self._lock:
|
|
existed = ws in self._conns
|
|
self._conns.pop(ws, None)
|
|
if existed:
|
|
print(f"[ws] disconnected client={id(ws)}")
|
|
|
|
async def broadcast_json(self, payload: dict[str, Any]) -> None:
|
|
now = time.monotonic()
|
|
|
|
async with self._lock:
|
|
clients = list(self._conns.values())
|
|
|
|
to_remove: list[WebSocket] = []
|
|
for c in clients:
|
|
# throttling per connection
|
|
if c.last_sent_monotonic and (now - c.last_sent_monotonic) < c.min_interval:
|
|
continue
|
|
|
|
try:
|
|
await c.ws.send_json(payload)
|
|
c.last_sent_monotonic = now
|
|
except Exception:
|
|
to_remove.append(c.ws)
|
|
|
|
if to_remove:
|
|
async with self._lock:
|
|
for ws in to_remove:
|
|
self._conns.pop(ws, None)
|