#!/usr/bin/env python3 """ FR-1.4 Data Transmission Protocol Parser (v1) 12-byte binary packet: [0xAA][TYPE=0x02][LEN=8][TIMESTAMP(4)][RMS_DB(2)][FREQ_HZ(2)][CRC8(1)] """ import struct from typing import Optional, NamedTuple from dataclasses import dataclass class AudioMetrics(NamedTuple): """Parsed audio metrics packet""" timestamp_ms: int rms_db: float freq_hz: int valid: bool = True @dataclass class ProtocolStats: """Protocol statistics""" packets_received: int = 0 crc_errors: int = 0 length_errors: int = 0 range_errors: int = 0 class ProtocolParser: """ Stream parser for FR-1.4 protocol with automatic resynchronization. """ SOF = 0xAA TYPE_AUDIO_V1 = 0x02 PAYLOAD_LEN = 0x08 PACKET_SIZE = 12 def __init__(self): self.buffer = bytearray() self.stats = ProtocolStats() @staticmethod def _crc8_atm(data: bytes) -> int: """CRC-8/ATM: poly=0x07, init=0x00, refin=false, refout=false, xorout=0x00""" crc = 0x00 for byte in data: crc ^= byte for _ in range(8): if crc & 0x80: crc = ((crc << 1) ^ 0x07) & 0xFF else: crc = (crc << 1) & 0xFF return crc def feed(self, data: bytes) -> list[AudioMetrics]: """ Feed incoming bytes, return list of parsed packets. Args: data: Raw bytes from serial port Returns: List of successfully parsed AudioMetrics """ self.buffer.extend(data) packets = [] while len(self.buffer) >= self.PACKET_SIZE: # Find SOF sof_idx = self.buffer.find(self.SOF) if sof_idx == -1: # No SOF found, discard all but last byte self.buffer = self.buffer[-1:] break # Discard bytes before SOF if sof_idx > 0: self.buffer = self.buffer[sof_idx:] # Need at least 3 bytes for SOF + TYPE + LEN if len(self.buffer) < 3: break packet_type = self.buffer[1] payload_len = self.buffer[2] # Validate TYPE and LEN if packet_type != self.TYPE_AUDIO_V1 or payload_len != self.PAYLOAD_LEN: self.stats.length_errors += 1 self.buffer.pop(0) # Remove false SOF, retry continue # Full packet size = SOF(1) + TYPE(1) + LEN(1) + PAYLOAD(8) + CRC(1) = 12 total_len = 3 + payload_len + 1 if len(self.buffer) < total_len: break # Wait for more data packet = bytes(self.buffer[:total_len]) # Verify CRC (over bytes 1..10: TYPE, LEN, payload) crc_data = packet[1:11] expected_crc = packet[11] calculated_crc = self._crc8_atm(crc_data) if calculated_crc != expected_crc: self.stats.crc_errors += 1 self.buffer.pop(0) # Remove bad packet, retry continue # Parse payload (little-endian) timestamp_ms, rms_db_x10, freq_hz = struct.unpack_from(' ProtocolStats: """Get current statistics""" return self.stats