137 lines
3.9 KiB
Python
137 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
FR-1.4 Data Transmission Protocol Parser (v1)
|
|
12-byte binary packet: [0xAA][TYPE=0x02][LEN=8][TIMESTAMP(4)][RMS_DB(2)][FREQ_HZ(2)][CRC8(1)]
|
|
"""
|
|
|
|
import struct
|
|
from typing import Optional, NamedTuple
|
|
from dataclasses import dataclass
|
|
|
|
|
|
class AudioMetrics(NamedTuple):
|
|
"""Parsed audio metrics packet"""
|
|
timestamp_ms: int
|
|
rms_db: float
|
|
freq_hz: int
|
|
valid: bool = True
|
|
|
|
|
|
@dataclass
|
|
class ProtocolStats:
|
|
"""Protocol statistics"""
|
|
packets_received: int = 0
|
|
crc_errors: int = 0
|
|
length_errors: int = 0
|
|
range_errors: int = 0
|
|
|
|
|
|
class ProtocolParser:
|
|
"""
|
|
Stream parser for FR-1.4 protocol with automatic resynchronization.
|
|
"""
|
|
|
|
SOF = 0xAA
|
|
TYPE_AUDIO_V1 = 0x02
|
|
PAYLOAD_LEN = 0x08
|
|
PACKET_SIZE = 12
|
|
|
|
def __init__(self):
|
|
self.buffer = bytearray()
|
|
self.stats = ProtocolStats()
|
|
|
|
@staticmethod
|
|
def _crc8_atm(data: bytes) -> int:
|
|
"""CRC-8/ATM: poly=0x07, init=0x00, refin=false, refout=false, xorout=0x00"""
|
|
crc = 0x00
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x80:
|
|
crc = ((crc << 1) ^ 0x07) & 0xFF
|
|
else:
|
|
crc = (crc << 1) & 0xFF
|
|
return crc
|
|
|
|
def feed(self, data: bytes) -> list[AudioMetrics]:
|
|
"""
|
|
Feed incoming bytes, return list of parsed packets.
|
|
|
|
Args:
|
|
data: Raw bytes from serial port
|
|
|
|
Returns:
|
|
List of successfully parsed AudioMetrics
|
|
"""
|
|
self.buffer.extend(data)
|
|
packets = []
|
|
|
|
while len(self.buffer) >= self.PACKET_SIZE:
|
|
# Find SOF
|
|
sof_idx = self.buffer.find(self.SOF)
|
|
if sof_idx == -1:
|
|
# No SOF found, discard all but last byte
|
|
self.buffer = self.buffer[-1:]
|
|
break
|
|
|
|
# Discard bytes before SOF
|
|
if sof_idx > 0:
|
|
self.buffer = self.buffer[sof_idx:]
|
|
|
|
# Need at least 3 bytes for SOF + TYPE + LEN
|
|
if len(self.buffer) < 3:
|
|
break
|
|
|
|
packet_type = self.buffer[1]
|
|
payload_len = self.buffer[2]
|
|
|
|
# Validate TYPE and LEN
|
|
if packet_type != self.TYPE_AUDIO_V1 or payload_len != self.PAYLOAD_LEN:
|
|
self.stats.length_errors += 1
|
|
self.buffer.pop(0) # Remove false SOF, retry
|
|
continue
|
|
|
|
# Full packet size = SOF(1) + TYPE(1) + LEN(1) + PAYLOAD(8) + CRC(1) = 12
|
|
total_len = 3 + payload_len + 1
|
|
if len(self.buffer) < total_len:
|
|
break # Wait for more data
|
|
|
|
packet = bytes(self.buffer[:total_len])
|
|
|
|
# Verify CRC (over bytes 1..10: TYPE, LEN, payload)
|
|
crc_data = packet[1:11]
|
|
expected_crc = packet[11]
|
|
calculated_crc = self._crc8_atm(crc_data)
|
|
|
|
if calculated_crc != expected_crc:
|
|
self.stats.crc_errors += 1
|
|
self.buffer.pop(0) # Remove bad packet, retry
|
|
continue
|
|
|
|
# Parse payload (little-endian)
|
|
timestamp_ms, rms_db_x10, freq_hz = struct.unpack_from('<IhH', packet, 3)
|
|
|
|
# Convert and validate ranges
|
|
rms_db = rms_db_x10 / 10.0
|
|
valid = True
|
|
if not (-40.0 <= rms_db <= 80.0) or not (100 <= freq_hz <= 8000):
|
|
self.stats.range_errors += 1
|
|
valid = False
|
|
|
|
self.stats.packets_received += 1
|
|
packets.append(AudioMetrics(
|
|
timestamp_ms=timestamp_ms,
|
|
rms_db=rms_db,
|
|
freq_hz=freq_hz,
|
|
valid=valid
|
|
))
|
|
|
|
# Remove processed packet
|
|
self.buffer = self.buffer[total_len:]
|
|
|
|
return packets
|
|
|
|
def get_stats(self) -> ProtocolStats:
|
|
"""Get current statistics"""
|
|
return self.stats
|