82 lines
2.2 KiB
Python
82 lines
2.2 KiB
Python
import struct
|
|
|
|
import pytest
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from protocol_parser import ProtocolParser
|
|
|
|
|
|
def build_packet(timestamp_ms: int, rms_db_x10: int, freq_hz: int) -> bytes:
|
|
sof = bytes([ProtocolParser.SOF])
|
|
header = bytes([ProtocolParser.TYPE_AUDIO_V1, ProtocolParser.PAYLOAD_LEN])
|
|
payload = struct.pack("<IhH", timestamp_ms, rms_db_x10, freq_hz)
|
|
|
|
crc_data = header + payload # bytes 1..10 in the wire format
|
|
crc = ProtocolParser._crc8_atm(crc_data)
|
|
|
|
return sof + header + payload + bytes([crc])
|
|
|
|
|
|
def test_valid_packet():
|
|
p = ProtocolParser()
|
|
raw = build_packet(timestamp_ms=1234, rms_db_x10=-123, freq_hz=440)
|
|
|
|
packets = p.feed(raw)
|
|
|
|
assert len(packets) == 1
|
|
pkt = packets[0]
|
|
assert pkt.valid is True
|
|
assert pkt.timestamp_ms == 1234
|
|
assert pkt.rms_db == -12.3
|
|
assert pkt.freq_hz == 440
|
|
|
|
st = p.get_stats()
|
|
assert st.packets_received == 1
|
|
assert st.crc_errors == 0
|
|
|
|
|
|
def test_bad_crc_packet():
|
|
p = ProtocolParser()
|
|
raw = bytearray(build_packet(timestamp_ms=1, rms_db_x10=-10, freq_hz=1000))
|
|
raw[-1] ^= 0xFF # ломаем CRC
|
|
|
|
packets = p.feed(bytes(raw))
|
|
|
|
assert packets == []
|
|
st = p.get_stats()
|
|
assert st.packets_received == 0
|
|
assert st.crc_errors == 1
|
|
|
|
|
|
def test_garbage_then_valid_packet_resync():
|
|
p = ProtocolParser()
|
|
|
|
garbage = b"\x00\xff\xaa\x01\x02\x03\x04" # содержит ложный SOF и мусор
|
|
raw = garbage + build_packet(timestamp_ms=777, rms_db_x10=-321, freq_hz=1234)
|
|
|
|
packets = p.feed(raw)
|
|
|
|
assert len(packets) == 1
|
|
assert packets[0].timestamp_ms == 777
|
|
assert packets[0].rms_db == -32.1
|
|
assert packets[0].freq_hz == 1234
|
|
assert p.get_stats().length_errors >= 1 # парсер должен "проглотить" мусор
|
|
|
|
|
|
def test_two_packets_in_one_chunk():
|
|
p = ProtocolParser()
|
|
|
|
raw = build_packet(timestamp_ms=10, rms_db_x10=-100, freq_hz=500) + build_packet(
|
|
timestamp_ms=20, rms_db_x10=-200, freq_hz=600
|
|
)
|
|
|
|
packets = p.feed(raw)
|
|
|
|
assert len(packets) == 2
|
|
assert packets[0].timestamp_ms == 10
|
|
assert packets[1].timestamp_ms == 20
|