Compare commits

..

10 Commits

60 changed files with 2800 additions and 126 deletions

12
.example.env Normal file
View File

@@ -0,0 +1,12 @@
# collector
SERIAL_PORT=/dev/ttyACM0
BAUDRATE=115200
# DB
DB_NAME=audio_analyzer
DB_USER=postgres
DB_PASSWORD=postgres
DB_PORT=5432
# api
DATABASE_URL=postgresql+asyncpg://postgres:postgres@db:5432/audio_analyzer

3
.gitignore vendored
View File

@@ -56,3 +56,6 @@ Mkfile.old
dkms.conf
# End of https://www.toptal.com/developers/gitignore/api/c
Build/
*.bin

3
.gitmodules vendored
View File

@@ -10,3 +10,6 @@
[submodule "firmware/Drivers/CMSIS/Core"]
path = firmware/Drivers/CMSIS/Core
url = https://github.com/STMicroelectronics/cmsis_core.git
[submodule "firmware/Middlewares/CMSIS-DSP"]
path = firmware/Middlewares/CMSIS-DSP
url = https://github.com/ARM-software/CMSIS-DSP.git

View File

@@ -1,41 +0,0 @@
#!/usr/bin/env python3
import sys
import time
import serial
# Настройки порта
SERIAL_PORT = "/dev/ttyACM0"
BAUD_RATE = 115200
def read_serial_data():
try:
# Открытие порта. Timeout позволяет прерывать блокирующее чтение.
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print(f"Connected to {SERIAL_PORT}")
while True:
if ser.in_waiting > 0:
# Чтение строки, декодирование и удаление пробелов
line = ser.readline().decode("utf-8", errors="replace").strip()
if line:
print(f"[RX]: {line}")
else:
# Небольшая пауза, чтобы не грузить CPU ПК
time.sleep(0.01)
except serial.SerialException as e:
print(f"Error opening serial port: {e}")
print(
"Hint: Check if /dev/ttyACM0 exists and you have permissions (group uucp)."
)
except KeyboardInterrupt:
print("\nExiting...")
finally:
if "ser" in locals() and ser.is_open:
ser.close()
if __name__ == "__main__":
read_serial_data()

56
db/init.sql Normal file
View File

@@ -0,0 +1,56 @@
-- TimescaleDB hypertable for time-series audio metrics
-- Enable TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Create audio data table
CREATE TABLE IF NOT EXISTS audio_data (
time TIMESTAMPTZ NOT NULL,
rms_db REAL CHECK (rms_db >= -50.0 AND rms_db <= 0.0),
frequency_hz INTEGER CHECK (frequency_hz >= 100 AND frequency_hz <= 8000),
is_silence BOOLEAN NOT NULL DEFAULT FALSE
);
-- Convert to hypertable (time-series optimization)
SELECT create_hypertable('audio_data', 'time', if_not_exists => TRUE);
-- Create index for frequency queries (exclude silence for performance)
CREATE INDEX IF NOT EXISTS idx_frequency
ON audio_data(frequency_hz)
WHERE NOT is_silence;
-- Create index for time-based queries
CREATE INDEX IF NOT EXISTS idx_time_desc
ON audio_data(time DESC);
-- Optional: Create continuous aggregate for 1-minute averages
CREATE MATERIALIZED VIEW IF NOT EXISTS audio_data_1min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
AVG(rms_db) AS avg_rms_db,
MAX(rms_db) AS max_rms_db,
MIN(rms_db) AS min_rms_db,
mode() WITHIN GROUP (ORDER BY frequency_hz) AS dominant_freq_hz,
SUM(CASE WHEN is_silence THEN 1 ELSE 0 END)::REAL / COUNT(*) AS silence_ratio
FROM audio_data
GROUP BY bucket
WITH NO DATA;
-- Refresh policy: update aggregate every 5 minutes
SELECT add_continuous_aggregate_policy('audio_data_1min',
start_offset => INTERVAL '1 hour',
end_offset => INTERVAL '1 minute',
schedule_interval => INTERVAL '5 minutes',
if_not_exists => TRUE
);
-- Data retention: keep raw data for 7 days
SELECT add_retention_policy('audio_data',
INTERVAL '7 days',
if_not_exists => TRUE
);
-- Grant permissions
GRANT ALL ON audio_data TO postgres;
GRANT SELECT ON audio_data_1min TO postgres;

84
docker-compose.yml Normal file
View File

@@ -0,0 +1,84 @@
services:
db:
image: timescale/timescaledb:2.13.1-pg16
container_name: audio_timescaledb
environment:
POSTGRES_DB: ${DB_NAME:-audio_analyzer}
POSTGRES_USER: ${DB_USER:-postgres}
POSTGRES_PASSWORD: ${DB_PASSWORD:-postgres}
volumes:
- timescale_data:/var/lib/postgresql/data
- ./db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
ports:
- "${DB_PORT:-5432}:5432"
networks:
- audio_network
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-postgres}"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
collector:
build:
context: ./services/collector
dockerfile: Dockerfile
container_name: audio_collector
depends_on:
db:
condition: service_healthy
environment:
SERIAL_PORT: ${SERIAL_PORT:-/dev/ttyACM0}
BAUDRATE: ${BAUDRATE:-115200}
DB_HOST: db
DB_PORT: 5432
DB_NAME: ${DB_NAME:-audio_analyzer}
DB_USER: ${DB_USER:-postgres}
DB_PASSWORD: ${DB_PASSWORD:-postgres}
devices:
- "${SERIAL_PORT:-/dev/ttyACM0}:${SERIAL_PORT:-/dev/ttyACM0}"
networks:
- audio_network
restart: unless-stopped
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
api:
build:
context: ./services/api
dockerfile: Dockerfile
container_name: audio_api
depends_on:
db:
condition: service_healthy
environment:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@db:5432/audio_analyzer
ports:
- "8000:8000"
volumes:
- ./services/api:/app
networks:
- audio_network
healthcheck:
test:
[
"CMD-SHELL",
"curl --fail http://localhost:8000/api/v1/health/live || exit 1",
]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
volumes:
timescale_data:
driver: local
networks:
audio_network:
driver: bridge

View File

@@ -1,6 +1,9 @@
#ifndef FREERTOS_CONFIG_H
#define FREERTOS_CONFIG_H
#define configCHECK_FOR_STACK_OVERFLOW 2
#define configUSE_MALLOC_FAILED_HOOK 1
#define configUSE_PREEMPTION 1
#define configUSE_IDLE_HOOK 0
#define configUSE_TICK_HOOK 0
@@ -8,7 +11,7 @@
#define configTICK_RATE_HZ ((TickType_t)1000)
#define configMAX_PRIORITIES (5)
#define configMINIMAL_STACK_SIZE ((unsigned short)128)
#define configTOTAL_HEAP_SIZE ((size_t)(10 * 1024))
#define configTOTAL_HEAP_SIZE ((size_t)(8 * 1024))
#define configMAX_TASK_NAME_LEN (16)
#define configUSE_16_BIT_TICKS 0
#define configIDLE_SHOULD_YIELD 1

View File

@@ -0,0 +1,29 @@
#ifndef AUDIO_ADC_H
#define AUDIO_ADC_H
#include <stdbool.h>
#include "audio_config.h"
/**
* @brief Инициализация ADC, DMA и Timer для audio capture
* @param callback Функция, вызываемая при заполнении буфера
* @return true если успешно, false при ошибке
*/
bool audio_adc_init(audio_buffer_ready_callback_t callback);
/**
* @brief Запуск непрерывного захвата аудио
*/
void audio_adc_start(void);
/**
* @brief Остановка захвата аудио
*/
void audio_adc_stop(void);
/**
* @brief Получить текущее количество обработанных буферов
*/
uint32_t audio_adc_get_buffer_count(void);
#endif /* AUDIO_ADC_H */

View File

@@ -0,0 +1,22 @@
#ifndef AUDIO_CONFIG_H
#define AUDIO_CONFIG_H
#include <stdint.h>
// Audio Configuration
#define AUDIO_SAMPLE_RATE 22050U
#define AUDIO_BUFFER_SIZE 512U
// ADC Configuration
#define AUDIO_ADC_CHANNEL 1U // PA1 = ADC1_IN1
// Timer Configuration (TIM2 для 72 MHz)
#define AUDIO_TIMER_PRESCALER 0U
#define AUDIO_TIMER_PERIOD 3264U // 72MHz / 3265 ≈ 22050 Hz
// Data Types
typedef uint16_t audio_sample_t;
typedef void (
*audio_buffer_ready_callback_t)(audio_sample_t* buffer, uint32_t size);
#endif /* AUDIO_CONFIG_H */

View File

@@ -0,0 +1,20 @@
#pragma once
#include <stdbool.h>
#include <stdint.h>
#include "audio_config.h"
typedef struct {
// dBFS (потом можно добавить калибровочный оффсет до dB SPL)
float rms_dbfs;
float peak_hz; // доминантная частота
float peak_mag; // амплитуда бина (относительная)
uint8_t clipped; // 1 если был клиппинг (0 или 4095)
} audio_metrics_t;
bool audio_processor_init(void);
// process ровно 512 сэмплов
bool audio_processor_process_512(
const audio_sample_t* samples,
audio_metrics_t* out);

15
firmware/App/Inc/health.h Normal file
View File

@@ -0,0 +1,15 @@
#ifndef HEALTH_H
#define HEALTH_H
#include <stdbool.h>
#include <stdint.h>
void health_kick_watchdog(void);
void health_init_watchdog(void);
void health_update_led(float freq_hz, float rms_dbfs);
void health_led_task(void *param);
#endif // HEALTH_H

View File

@@ -0,0 +1,40 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H
#include <stddef.h>
#include <stdint.h>
// Protocol Constants
#define PROTOCOL_SOF 0xAA
#define PACKET_TYPE_AUDIO 0x02
#define PACKET_LEN_V1 0x08 // Payload length (excluding SOF, TYPE, LEN, CRC)
#define PACKET_TOTAL_SIZE 12
// CRC8-ATM Constants
#define CRC8_POLY 0x07
#define CRC8_INIT 0x00
/**
* @brief Calculates CRC-8/ATM over the data buffer.
* Polynomial: x^8 + x^2 + x + 1 (0x07)
* Init: 0x00, RefIn: false, RefOut: false, XorOut: 0x00
* @param data Pointer to data buffer
* @param len Length of data
* @return Calculated CRC8
*/
uint8_t crc8_atm(const uint8_t *data, size_t len);
/**
* @brief Encodes the audio metric packet into the wire format.
* @param buf Output buffer (must be at least 12 bytes)
* @param timestamp_ms Timestamp in milliseconds
* @param rms_dbfs RMS value in dBFS (float)
* @param freq_hz Peak frequency in Hz (float)
*/
void protocol_pack_v1(
uint8_t *buf,
uint32_t timestamp_ms,
float rms_dbfs,
float freq_hz);
#endif // PROTOCOL_H

View File

@@ -19,8 +19,8 @@ extern "C" {
// Конфигурация CDC (Communication Device Class)
#define CFG_TUD_CDC 1
#define CFG_TUD_CDC_RX_BUFSIZE 64
#define CFG_TUD_CDC_TX_BUFSIZE 64
#define CFG_TUD_CDC_RX_BUFSIZE 256
#define CFG_TUD_CDC_TX_BUFSIZE 256
// Endpoint буферизация
#define CFG_TUD_ENDPOINT0_SIZE 64

View File

@@ -0,0 +1,168 @@
#include "audio_adc.h"
#include <string.h>
#include "stm32f1xx.h"
// Один непрерывный DMA-буфер: 2 * 512 = 1024 семпла
static audio_sample_t dma_buffer[2 * AUDIO_BUFFER_SIZE];
// Callback функция
static audio_buffer_ready_callback_t user_callback = NULL;
// Статистика (для отладки)
static volatile uint32_t buffer_count = 0;
static volatile uint32_t dma_half_transfer_count = 0;
static volatile uint32_t dma_full_transfer_count = 0;
// Private Function Prototypes
static void audio_gpio_init(void);
static void audio_timer_init(void);
static void audio_adc_hw_init(void);
static void audio_dma_init(void);
bool audio_adc_init(audio_buffer_ready_callback_t callback) {
if (callback == NULL) return false;
user_callback = callback;
audio_gpio_init();
audio_timer_init();
audio_adc_hw_init();
audio_dma_init();
return true;
}
void audio_adc_start(void) {
DMA1_Channel1->CCR |= DMA_CCR_EN;
ADC1->CR2 |= ADC_CR2_ADON;
TIM3->CR1 |= TIM_CR1_CEN;
}
void audio_adc_stop(void) {
TIM3->CR1 &= ~TIM_CR1_CEN;
ADC1->CR2 &= ~ADC_CR2_ADON;
DMA1_Channel1->CCR &= ~DMA_CCR_EN;
}
uint32_t audio_adc_get_buffer_count(void) {
return buffer_count;
}
static void audio_gpio_init(void) {
RCC->APB2ENR |= RCC_APB2ENR_IOPAEN;
// PA1 analog
GPIOA->CRL &= ~(0xF << 4);
}
static void audio_timer_init(void) {
// Включаем тактирование Timer3
RCC->APB1ENR |= RCC_APB1ENR_TIM3EN;
// Настраиваем TIM3 для 22050 Hz
TIM3->PSC = AUDIO_TIMER_PRESCALER;
TIM3->ARR = AUDIO_TIMER_PERIOD;
// TRGO = Update event
TIM3->CR2 &= ~TIM_CR2_MMS;
TIM3->CR2 |= TIM_CR2_MMS_1;
TIM3->CR1 |= TIM_CR1_ARPE;
TIM3->EGR |= TIM_EGR_UG;
TIM3->CNT = 0;
}
static void audio_adc_hw_init(void) {
RCC->APB2ENR |= RCC_APB2ENR_ADC1EN;
RCC->CFGR &= ~RCC_CFGR_ADCPRE;
RCC->CFGR |= RCC_CFGR_ADCPRE_DIV6;
ADC1->CR2 = 0;
ADC1->CR1 = 0;
ADC1->CR1 &= ~ADC_CR1_SCAN;
ADC1->CR2 &= ~ADC_CR2_CONT;
// EXTSEL = 011 (Timer2 TRGO), EXTTRIG enable
ADC1->CR2 &= ~ADC_CR2_EXTSEL;
ADC1->CR2 |= (0x4U << 17); // TIM3_TRGO
ADC1->CR2 |= ADC_CR2_EXTTRIG;
ADC1->CR2 |= ADC_CR2_DMA;
ADC1->CR2 &= ~ADC_CR2_ALIGN;
ADC1->SQR1 &= ~ADC_SQR1_L;
ADC1->SQR3 &= ~ADC_SQR3_SQ1;
ADC1->SQR3 |= (AUDIO_ADC_CHANNEL << ADC_SQR3_SQ1_Pos);
ADC1->SMPR2 &= ~ADC_SMPR2_SMP1;
ADC1->SMPR2 |= ADC_SMPR2_SMP1_0; // 7.5 cycles
// calibration
ADC1->CR2 |= ADC_CR2_ADON;
for (volatile int i = 0; i < 1000; i++) {}
ADC1->CR2 |= ADC_CR2_CAL;
while (ADC1->CR2 & ADC_CR2_CAL) {}
ADC1->CR2 &= ~ADC_CR2_ADON;
}
static void audio_dma_init(void) {
RCC->AHBENR |= RCC_AHBENR_DMA1EN;
DMA1_Channel1->CCR &= ~DMA_CCR_EN;
while (DMA1_Channel1->CCR & DMA_CCR_EN) {}
// ADC1 DR -> RAM
DMA1_Channel1->CPAR = (uint32_t)&ADC1->DR;
// ВАЖНО: CMAR указывает на непрерывный буфер 1024 samples
DMA1_Channel1->CMAR = (uint32_t)dma_buffer;
// ВАЖНО: 2 * 512 = 1024 samples
DMA1_Channel1->CNDTR = 2 * AUDIO_BUFFER_SIZE;
uint32_t ccr = 0;
ccr |= DMA_CCR_MINC;
ccr |= DMA_CCR_CIRC;
ccr |= DMA_CCR_HTIE;
ccr |= DMA_CCR_TCIE;
ccr |= DMA_CCR_PL_1; // high
ccr |= DMA_CCR_MSIZE_0; // 16-bit
ccr |= DMA_CCR_PSIZE_0; // 16-bit
DMA1_Channel1->CCR = ccr;
NVIC_SetPriority(DMA1_Channel1_IRQn, 6);
NVIC_EnableIRQ(DMA1_Channel1_IRQn);
}
void DMA1_Channel1_IRQHandler(void) {
uint32_t isr = DMA1->ISR;
if (isr & DMA_ISR_HTIF1) {
DMA1->IFCR = DMA_IFCR_CHTIF1;
dma_half_transfer_count++;
// первая половина: [0 .. 511]
if (user_callback) { user_callback(&dma_buffer[0], AUDIO_BUFFER_SIZE); }
buffer_count++;
}
if (isr & DMA_ISR_TCIF1) {
DMA1->IFCR = DMA_IFCR_CTCIF1;
dma_full_transfer_count++;
// вторая половина: [512 .. 1023]
if (user_callback) {
user_callback(&dma_buffer[AUDIO_BUFFER_SIZE], AUDIO_BUFFER_SIZE);
}
buffer_count++;
}
if (isr & DMA_ISR_TEIF1) {
DMA1->IFCR = DMA_IFCR_CTEIF1;
// TODO: error handling
}
}

View File

@@ -0,0 +1,112 @@
#include "audio_processor.h"
#include <math.h>
#include <string.h>
#include "arm_math.h"
#include "stm32f1xx.h"
#ifndef AUDIO_FFT_SIZE
#define AUDIO_FFT_SIZE 512U
#endif
#if (AUDIO_FFT_SIZE != 512U)
#error "This module currently expects AUDIO_FFT_SIZE == 512"
#endif
#define ADC_FULL_SCALE 4095.0f
#define ADC_MID_SCALE 2048.0f
#define EPS_RMS 1e-12f
static arm_rfft_fast_instance_f32 rfft;
// ОПТИМИЗАЦИЯ: используем один буфер для in/out FFT
static float32_t fft_buffer[AUDIO_FFT_SIZE]; // 2KB
static float32_t mag[AUDIO_FFT_SIZE / 2U]; // 1KB (bins 0..N/2-1)
static uint32_t bin_min = 0;
static uint32_t bin_max = 0;
static float32_t hz_per_bin = 0.0f;
// Inline Hann window (без хранения коэффициентов)
static inline float32_t hann_coeff(uint32_t i, uint32_t n) {
const float32_t two_pi = 6.28318530717958647693f;
const float32_t denom = (float32_t)(n - 1U);
return 0.5f - 0.5f * arm_cos_f32(two_pi * (float32_t)i / denom);
}
bool audio_processor_init(void) {
// FFT init
if (arm_rfft_fast_init_512_f32(&rfft) != ARM_MATH_SUCCESS) { return false; }
hz_per_bin = ((float32_t)AUDIO_SAMPLE_RATE) / ((float32_t)AUDIO_FFT_SIZE);
// Диапазон поиска пика 100..8000 Hz
bin_min = (uint32_t)ceilf(100.0f / hz_per_bin);
bin_max = (uint32_t)floorf(8000.0f / hz_per_bin);
// safety
if (bin_min < 1U) bin_min = 1U;
const uint32_t last_bin = (AUDIO_FFT_SIZE / 2U) - 1U;
if (bin_max > last_bin) bin_max = last_bin;
return true;
}
bool audio_processor_process_512(
const audio_sample_t* samples,
audio_metrics_t* out) {
if (!samples || !out) return false;
// 1) Mean + clipping detect
uint32_t sum = 0;
uint8_t clipped = 0;
for (uint32_t i = 0; i < AUDIO_FFT_SIZE; i++) {
const uint16_t s = samples[i];
sum += s;
if (s == 0U || s == 4095U) clipped = 1;
}
const float32_t mean = (float32_t)sum / (float32_t)AUDIO_FFT_SIZE;
// 2) RMS of AC component + prepare FFT input (normalized, windowed)
float32_t acc = 0.0f;
for (uint32_t i = 0; i < AUDIO_FFT_SIZE; i++) {
// centered around 0, normalized to roughly [-1..1]
float32_t x = ((float32_t)samples[i] - mean) / ADC_MID_SCALE;
acc += x * x;
// apply window inline (saves 2KB RAM)
fft_buffer[i] = x * hann_coeff(i, AUDIO_FFT_SIZE);
}
const float32_t rms = sqrtf(acc / (float32_t)AUDIO_FFT_SIZE);
const float32_t rms_dbfs = 20.0f * log10f(rms + EPS_RMS);
// 3) FFT (in-place: fft_buffer используется для in/out)
arm_rfft_fast_f32(&rfft, fft_buffer, fft_buffer, 0);
// 4) Magnitudes for bins 0..N/2-1
// CMSIS layout: [Re(0), Im(0)=0, Re(1), Im(1), ..., Re(N/2), Im(N/2)=0]
// Мы берём bin 1..N/2-1 для поиска пика
arm_cmplx_mag_f32(fft_buffer, mag, AUDIO_FFT_SIZE / 2U);
// 5) Peak search in desired band (skip DC bin 0)
uint32_t best_bin = bin_min;
float32_t best_mag = 0.0f;
for (uint32_t k = bin_min; k <= bin_max; k++) {
const float32_t m = mag[k];
if (m > best_mag) {
best_mag = m;
best_bin = k;
}
}
out->rms_dbfs = rms_dbfs;
out->peak_mag = best_mag;
out->peak_hz = (float32_t)best_bin * hz_per_bin;
out->clipped = clipped;
return true;
}

68
firmware/App/Src/health.c Normal file
View File

@@ -0,0 +1,68 @@
#include "health.h"
#include <math.h>
#include "FreeRTOS.h"
#include "stm32f1xx.h"
#include "task.h"
// Default to "Heartbeat" mode (1 Hz)
static volatile uint32_t led_period_ms = 3000;
void health_init_watchdog(void) {
IWDG->KR = 0x5555;
IWDG->PR = 0x04; // Prescaler /64 -> 625 Hz
IWDG->RLR = 1875; // ~3 seconds
IWDG->KR = 0xCCCC;
IWDG->KR = 0xAAAA;
}
void health_kick_watchdog(void) {
IWDG->KR = 0xAAAA;
}
void health_update_led(float freq_hz, float rms_dbfs) {
if (rms_dbfs < -35.0f) {
led_period_ms = 3000;
return;
}
if (freq_hz < 100.0f) freq_hz = 100.0f;
if (freq_hz > 6000.0f) freq_hz = 6000.0f;
// 100..8000 Hz -> 1..5 Hz blink (period 1000..200 ms)
float t = (log10f(freq_hz) - log10f(100.0f)) /
(log10f(6000.0f) - log10f(100.0f)); // 0..1
float blink_hz = 1.0f + t * 20.0f; // 1..5
uint32_t period = (uint32_t)(3000.0f / blink_hz); // 1000..200 ms
if (period < 200) period = 50;
if (period > 3000) period = 3000;
led_period_ms = period;
}
void health_led_task(void *param) {
(void)param;
// 1) Включаем тактирование GPIOC и настраиваем PC13 как выход push-pull
// 2MHz
RCC->APB2ENR |= RCC_APB2ENR_IOPCEN;
GPIOC->CRH &= ~(GPIO_CRH_MODE13 | GPIO_CRH_CNF13);
GPIOC->CRH |= GPIO_CRH_MODE13_1; // 2 MHz output, push-pull (CNF=00)
// LED off initially (Blue Pill LED is active-low)
GPIOC->ODR |= GPIO_ODR_ODR13;
while (1) {
uint32_t period = led_period_ms;
uint32_t on_ms = 40;
if (on_ms > period / 2) on_ms = period / 2;
uint32_t off_ms = period - on_ms;
// LED active-low: 0 = ON, 1 = OFF
GPIOC->BSRR = GPIO_BSRR_BR13; // ON
vTaskDelay(pdMS_TO_TICKS(on_ms)); // держим заметный импульс
GPIOC->BSRR = GPIO_BSRR_BS13; // OFF
vTaskDelay(pdMS_TO_TICKS(off_ms)); // пауза
health_kick_watchdog();
}
}

View File

@@ -1,61 +1,220 @@
#include <stdio.h>
#include <string.h>
#include "FreeRTOS.h"
#include "audio_adc.h"
#include "audio_processor.h" // НОВОЕ
#include "health.h"
#include "protocol.h"
#include "queue.h"
#include "stm32f1xx.h"
#include "task.h"
#include "tusb.h"
void vApplicationStackOverflowHook(TaskHandle_t xTask, char *pcTaskName) {
(void)xTask;
(void)pcTaskName;
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
for (volatile int i = 0; i < 50000; i++);
}
}
void vApplicationMallocFailedHook(void) {
taskDISABLE_INTERRUPTS();
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
for (volatile int i = 0; i < 200000; i++) {}
}
}
static void panic_blink_forever(uint32_t delay_ms) {
// PC13 уже сконфигурирован в main
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
vTaskDelay(pdMS_TO_TICKS(delay_ms));
}
}
// === Структуры данных ===
typedef struct {
float rms_dbfs;
float peak_hz;
float peak_mag;
uint8_t clipped;
uint32_t timestamp_ms;
} audio_metrics_packet_t;
static QueueHandle_t audio_metrics_queue = NULL;
static volatile uint32_t buffer_counter = 0;
// === System Clock ===
void SystemClock_Config(void) {
RCC->CR |= RCC_CR_HSEON;
while (!(RCC->CR & RCC_CR_HSERDY));
FLASH->ACR |= FLASH_ACR_LATENCY_2;
FLASH->ACR = FLASH_ACR_LATENCY_2;
// ЯВНО обнуляем бит USBPRE (div 1.5 для получения 48MHz USB)
RCC->CFGR &= ~RCC_CFGR_USBPRE; // <-- ДОБАВЛЕНО!
RCC->CFGR |= (RCC_CFGR_PLLSRC | RCC_CFGR_PLLMULL9);
RCC->CFGR &= ~RCC_CFGR_PLLMULL;
RCC->CFGR |= RCC_CFGR_PLLMULL9;
RCC->CFGR |= RCC_CFGR_PLLSRC;
RCC->CFGR &= ~RCC_CFGR_USBPRE;
RCC->CR |= RCC_CR_PLLON;
while (!(RCC->CR & RCC_CR_PLLRDY));
RCC->CFGR &= ~RCC_CFGR_SW;
RCC->CFGR |= RCC_CFGR_SW_PLL;
while ((RCC->CFGR & RCC_CFGR_SWS) != RCC_CFGR_SWS_PLL);
SystemCoreClock = 72000000;
}
// Задача USB (ТОЛЬКО tud_task, ничего больше!)
// === Audio Callback (НОВОЕ: копируем в очередь для обработки) ===
// Буфер для копирования из ISR
static audio_sample_t processing_buffer[AUDIO_BUFFER_SIZE];
void audio_buffer_ready(audio_sample_t *buffer, uint32_t size) {
buffer_counter++;
// Мигаем LED
/* if (buffer_counter % 5 == 0) { GPIOC->ODR ^= GPIO_ODR_ODR13; } */
// Копируем данные (ISR должен быть быстрым)
memcpy(processing_buffer, buffer, size * sizeof(audio_sample_t));
// Сигналим задаче обработки через notification
BaseType_t xHigherPriorityTaskWoken = pdFALSE;
extern TaskHandle_t audio_process_task_handle;
if (audio_process_task_handle != NULL) {
vTaskNotifyGiveFromISR(
audio_process_task_handle,
&xHigherPriorityTaskWoken);
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
}
}
// === Tasks ===
void usb_device_task(void *param) {
(void)param;
while (1) {
tud_task();
// Без задержки! tud_task() должен вызываться как можно чаще
vTaskDelay(pdMS_TO_TICKS(1));
}
}
TaskHandle_t audio_process_task_handle = NULL;
void audio_process_task(void *param) {
(void)param;
if (!audio_processor_init()) {
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
vTaskDelay(pdMS_TO_TICKS(50));
health_kick_watchdog();
}
}
audio_metrics_t metrics;
TickType_t last_wake = xTaskGetTickCount();
const TickType_t period = pdMS_TO_TICKS(100); // 10 Hz
while (1) {
// 1) Ждём хотя бы один новый буфер от ISR
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
// 2) Выкидываем накопившиеся уведомления, чтобы не пытаться "догонять"
// прошлое
while (ulTaskNotifyTake(pdTRUE, 0) > 0) {}
// 3) Обрабатываем самый свежий буфер (processing_buffer
// перезаписывается в ISR)
if (audio_processor_process_512(processing_buffer, &metrics)) {
health_update_led(metrics.peak_hz, metrics.rms_dbfs);
audio_metrics_packet_t packet = {
.rms_dbfs = metrics.rms_dbfs,
.peak_hz = metrics.peak_hz,
.peak_mag = metrics.peak_mag,
.clipped = metrics.clipped,
.timestamp_ms = xTaskGetTickCount(),
};
(void)xQueueSend(audio_metrics_queue, &packet, 0);
}
health_kick_watchdog();
// ограничиваем частоту обработки
vTaskDelayUntil(&last_wake, period);
}
}
// Задача CDC
void cdc_task(void *param) {
(void)param;
// Buffer for packet (12 bytes)
uint8_t tx_buffer[PACKET_TOTAL_SIZE];
while (1) {
// Check if USB is connected
if (tud_cdc_connected()) {
if (tud_cdc_available()) {
uint8_t buf[64];
uint32_t count = tud_cdc_read(buf, sizeof(buf));
tud_cdc_write(buf, count);
audio_metrics_packet_t packet;
// Wait for data from DSP task
if (xQueueReceive(
audio_metrics_queue,
&packet,
pdMS_TO_TICKS(10)) == pdPASS) {
// Pack data according to FR-1.4 spec
protocol_pack_v1(
tx_buffer,
packet.timestamp_ms,
packet.rms_dbfs,
packet.peak_hz);
// Write to USB CDC
// Check available space just in case
if (tud_cdc_write_available() >= sizeof(tx_buffer)) {
tud_cdc_write(tx_buffer, sizeof(tx_buffer));
tud_cdc_write_flush();
}
}
vTaskDelay(pdMS_TO_TICKS(10)); // 10мс достаточно
} else {
// Flush queue if USB not connected to prevent stalling DSP task
// or just sleep longer.
vTaskDelay(pdMS_TO_TICKS(100));
}
}
}
// Задача LED (отдельно!)
void led_task(void *param) {
void audio_init_task(void *param) {
(void)param;
while (1) {
GPIOC->BSRR = GPIO_BSRR_BR13; // LED ON
vTaskDelay(pdMS_TO_TICKS(500));
GPIOC->BSRR = GPIO_BSRR_BS13; // LED OFF
vTaskDelay(pdMS_TO_TICKS(500));
// Индикация старта
for (int i = 0; i < 3; i++) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
vTaskDelay(pdMS_TO_TICKS(100));
}
if (!audio_adc_init(audio_buffer_ready)) {
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
vTaskDelay(pdMS_TO_TICKS(50));
}
}
audio_adc_start();
for (int i = 0; i < 5; i++) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
vTaskDelay(pdMS_TO_TICKS(200));
}
vTaskDelete(NULL);
}
void force_usb_reset(void) {
@@ -68,28 +227,43 @@ void force_usb_reset(void) {
GPIOA->CRH |= GPIO_CRH_CNF12_0;
}
// === Main ===
int main(void) {
SystemClock_Config();
// Настройка LED
// LED GPIO (will be managed by health_led_task)
RCC->APB2ENR |= RCC_APB2ENR_IOPCEN;
GPIOC->CRH &= ~GPIO_CRH_CNF13;
GPIOC->CRH &= ~(GPIO_CRH_MODE13 | GPIO_CRH_CNF13);
GPIOC->CRH |= GPIO_CRH_MODE13_1;
force_usb_reset();
// Включаем USB
// USB
RCC->APB2ENR |= RCC_APB2ENR_IOPAEN;
RCC->APB1ENR |= RCC_APB1ENR_USBEN;
// Прерывания USB
NVIC_SetPriority(USB_HP_CAN1_TX_IRQn, 6);
NVIC_SetPriority(USB_LP_CAN1_RX0_IRQn, 6);
NVIC_SetPriority(USBWakeUp_IRQn, 6);
NVIC_EnableIRQ(USB_HP_CAN1_TX_IRQn);
NVIC_EnableIRQ(USB_LP_CAN1_RX0_IRQn);
NVIC_EnableIRQ(USBWakeUp_IRQn);
tusb_init();
// УВЕЛИЧИЛИ стек до 256!
// Initialize watchdog BEFORE starting tasks
health_init_watchdog();
// FFT queue
audio_metrics_queue = xQueueCreate(10, sizeof(audio_metrics_packet_t));
if (audio_metrics_queue == NULL) {
while (1) {
GPIOC->ODR ^= GPIO_ODR_ODR13;
for (volatile int i = 0; i < 100000; i++);
}
}
// Create tasks
xTaskCreate(
usb_device_task,
"usbd",
@@ -97,13 +271,31 @@ int main(void) {
NULL,
configMAX_PRIORITIES - 1,
NULL);
xTaskCreate(cdc_task, "cdc", 256, NULL, configMAX_PRIORITIES - 2, NULL);
xTaskCreate(led_task, "led", 128, NULL, 1, NULL);
xTaskCreate(cdc_task, "cdc", 320, NULL, configMAX_PRIORITIES - 2, NULL);
xTaskCreate(health_led_task, "health_led", 128, NULL, 1, NULL);
xTaskCreate(audio_init_task, "audio_init", 128, NULL, 2, NULL);
xTaskCreate(
audio_process_task,
"audio_proc",
512,
NULL,
configMAX_PRIORITIES - 2,
&audio_process_task_handle);
if (xTaskCreate(health_led_task, "health_led", 128, NULL, 1, NULL) !=
pdPASS) {
panic_blink_forever(100);
}
vTaskStartScheduler();
while (1);
while (1); // Should never reach here
}
// === USB Handlers ===
void USB_HP_CAN1_TX_IRQHandler(void) {
tud_int_handler(0);
}

View File

@@ -0,0 +1,51 @@
#include "protocol.h"
#include <math.h>
uint8_t crc8_atm(const uint8_t *data, size_t len) {
uint8_t crc = CRC8_INIT;
for (size_t i = 0; i < len; i++) {
crc ^= data[i];
for (uint8_t j = 0; j < 8; j++) {
if (crc & 0x80) {
crc = (crc << 1) ^ CRC8_POLY;
} else {
crc <<= 1;
}
}
}
return crc;
}
void protocol_pack_v1(
uint8_t *buf,
uint32_t timestamp_ms,
float rms_dbfs,
float freq_hz) {
// Header
buf[0] = PROTOCOL_SOF;
buf[1] = PACKET_TYPE_AUDIO;
buf[2] = PACKET_LEN_V1;
// Payload: Timestamp (4 bytes, Little Endian)
buf[3] = (uint8_t)(timestamp_ms & 0xFF);
buf[4] = (uint8_t)((timestamp_ms >> 8) & 0xFF);
buf[5] = (uint8_t)((timestamp_ms >> 16) & 0xFF);
buf[6] = (uint8_t)((timestamp_ms >> 24) & 0xFF);
// Payload: RMS_DB (2 bytes, Little Endian, x10, int16)
// Range check implicit by int16 cast, but clamping is safer
// Spec: -40..80 dB -> -400..800
// Note: Since DSP returns dBFS (negative), we just send it as is.
// E.g. -60.5 dB -> -605.
int16_t rms_fixed = (int16_t)(rms_dbfs * 10.0f);
buf[7] = (uint8_t)(rms_fixed & 0xFF);
buf[8] = (uint8_t)((rms_fixed >> 8) & 0xFF);
// Payload: FREQ_HZ (2 bytes, Little Endian, uint16)
uint16_t freq_fixed = (uint16_t)freq_hz;
buf[9] = (uint8_t)(freq_fixed & 0xFF);
buf[10] = (uint8_t)((freq_fixed >> 8) & 0xFF);
// CRC8 (Calculated over bytes 1..10: TYPE, LEN, Payload)
buf[11] = crc8_atm(&buf[1], 10);
}

View File

@@ -1,14 +1,17 @@
TARGET = stm32-usb-freertos
BUILD_DIR = Build
# --- Исходники ---
# 1. Приложение
# Приложение
C_SOURCES = \
App/Src/main.c \
App/Src/audio_adc.c \
App/Src/audio_processor.c \
App/Src/protocol.c \
App/Src/health.c \
App/Src/usb_descriptors.c \
App/Src/system_stm32f1xx.c \
# 2. FreeRTOS
# FreeRTOS
C_SOURCES += \
Middlewares/FreeRTOS/croutine.c \
Middlewares/FreeRTOS/event_groups.c \
@@ -19,7 +22,7 @@ Middlewares/FreeRTOS/timers.c \
Middlewares/FreeRTOS/portable/GCC/ARM_CM3/port.c \
Middlewares/FreeRTOS/portable/MemMang/heap_4.c
# 3. TinyUSB
# TinyUSB
C_SOURCES += \
Middlewares/TinyUSB/src/tusb.c \
Middlewares/TinyUSB/src/common/tusb_fifo.c \
@@ -28,7 +31,23 @@ Middlewares/TinyUSB/src/device/usbd_control.c \
Middlewares/TinyUSB/src/class/cdc/cdc_device.c \
Middlewares/TinyUSB/src/portable/st/stm32_fsdev/dcd_stm32_fsdev.c
# 4. Startup
# CMSIS-DSP sources
C_SOURCES += \
$(CMSIS_DSP)/Source/TransformFunctions/arm_cfft_radix8_f32.c \
$(CMSIS_DSP)/Source/TransformFunctions/arm_bitreversal2.c \
$(CMSIS_DSP)/Source/TransformFunctions/arm_rfft_fast_f32.c \
$(CMSIS_DSP)/Source/TransformFunctions/arm_rfft_fast_init_f32.c \
$(CMSIS_DSP)/Source/TransformFunctions/arm_cfft_f32.c \
$(CMSIS_DSP)/Source/TransformFunctions/arm_cfft_init_f32.c \
$(CMSIS_DSP)/Source/ComplexMathFunctions/arm_cmplx_mag_f32.c \
$(CMSIS_DSP)/Source/CommonTables/arm_const_structs.c \
$(CMSIS_DSP)/Source/CommonTables/arm_common_tables.c \
$(CMSIS_DSP)/Source/FastMathFunctions/arm_cos_f32.c
# CMSIS-DSP
CMSIS_DSP = Middlewares/CMSIS-DSP
# Startup
ASM_SOURCES = App/Src/startup_stm32f103xb.s
# --- Настройки компилятора ---
@@ -47,19 +66,29 @@ C_INCLUDES = \
-IDrivers/CMSIS/Device/ST/STM32F1xx/Include \
-IMiddlewares/FreeRTOS/include \
-IMiddlewares/FreeRTOS/portable/GCC/ARM_CM3 \
-IMiddlewares/TinyUSB/src
-IMiddlewares/TinyUSB/src \
-I$(CMSIS_DSP)/Include \
-I$(CMSIS_DSP)/PrivateInclude
# Defines
C_DEFS = \
-DSTM32F103xB \
-DCFG_TUSB_MCU=OPT_MCU_STM32F1
-DCFG_TUSB_MCU=OPT_MCU_STM32F1 \
-DARM_MATH_CM3
CFLAGS = $(MCU) $(C_DEFS) $(C_INCLUDES) -O2 -Wall -fdata-sections -ffunction-sections -g -gdwarf-2
CFLAGS = $(MCU) $(C_DEFS) $(C_INCLUDES) -Os -Wall -fdata-sections -ffunction-sections -g -gdwarf-2
# Linker
LDSCRIPT = stm32f103c8.ld
LIBS = -lc -lm -lnosys
LDFLAGS = $(MCU) -T$(LDSCRIPT) $(LIBS) -Wl,-Map=$(BUILD_DIR)/$(TARGET).map,--cref -Wl,--gc-sections -Wl,--no-warn-rwx-segments
# LIBS = -lc -lm -lnosys
# LDFLAGS = $(MCU) -T$(LDSCRIPT) $(LIBS) -Wl,-Map=$(BUILD_DIR)/$(TARGET).map,--cref -Wl,--gc-sections -Wl,--no-warn-rwx-segments
# LDFLAGS = $(MCU) -T$(LDSCRIPT) --specs=nano.specs --specs=nosys.specs \
# -Wl,-Map=$(BUILD_DIR)/$(TARGET).map,--cref -Wl,--gc-sections
LIBS = -Wl,--start-group -lc_nano -lm -lgcc -lnosys -Wl,--end-group
LDFLAGS = $(MCU) -T$(LDSCRIPT) $(LIBS) \
-Wl,-Map=$(BUILD_DIR)/$(TARGET).map,--cref \
-Wl,--gc-sections \
-Wl,--no-warn-rwx-segments
# --- Генерация списка объектов ---
OBJECTS = $(addprefix $(BUILD_DIR)/,$(notdir $(C_SOURCES:.c=.o)))
@@ -103,4 +132,3 @@ flash:
st-flash write $(BUILD_DIR)/$(TARGET).bin 0x8000000
.PHONY: all clean flash

View File

@@ -4,8 +4,8 @@ ENTRY(Reset_Handler)
/* Highest address of the user mode stack */
_estack = ORIGIN(RAM) + LENGTH(RAM); /* end of "RAM" Ram type memory */
_Min_Heap_Size = 0x200; /* required amount of heap */
_Min_Stack_Size = 0x400; /* required amount of stack */
_Min_Heap_Size = 0x000; /* required amount of heap */
_Min_Stack_Size = 0x800; /* required amount of stack */
/* Memories definition */
MEMORY

176
services/api/.gitignore vendored Normal file
View File

@@ -0,0 +1,176 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python

15
services/api/Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

View File

@@ -0,0 +1,36 @@
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.repositories.audio_repository import AudioRepository
from app.schemas.base import ApiResponse
from app.schemas.audio import AudioPoint
from app.services.audio_service import AudioService
router = APIRouter()
@router.get("/latest", response_model=ApiResponse[list[AudioPoint]])
async def latest(
limit: int = Query(100, ge=1, le=10000), db: AsyncSession = Depends(get_db)
):
service = AudioService(AudioRepository(db))
items = await service.latest(limit)
return ApiResponse(data=items, count=len(items))
@router.get("/range", response_model=ApiResponse[list[AudioPoint]])
async def range_(
time_from: datetime = Query(..., alias="from"),
time_to: datetime = Query(..., alias="to"),
db: AsyncSession = Depends(get_db),
):
if time_from >= time_to:
raise HTTPException(
status_code=400,
detail="'from' timestamp must be earlier than 'to' timestamp",
)
service = AudioService(AudioRepository(db))
items = await service.range(time_from, time_to)
return ApiResponse(data=items, count=len(items))

View File

@@ -0,0 +1,35 @@
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.repositories.audio_repository import AudioRepository
from app.schemas.base import ApiResponse
from app.schemas.events import LoudEvent
from app.services.events_service import EventsService
router = APIRouter()
@router.get("/loud", response_model=ApiResponse[list[LoudEvent]])
async def loud_events(
threshold: float = Query(
default=-35.0, ge=-50.0, le=0.0, description="RMS dB threshold"
),
time_from: Optional[datetime] = Query(None, alias="from"),
time_to: Optional[datetime] = Query(None, alias="to"),
db: AsyncSession = Depends(get_db),
):
if time_from and time_to and time_from >= time_to:
raise HTTPException(
status_code=400,
detail="'from' timestamp must be earlier than 'to' timestamp",
)
service = EventsService(AudioRepository(db))
events = await service.loud_events(
threshold=threshold, time_from=time_from, time_to=time_to
)
return ApiResponse(data=events, count=len(events))

View File

@@ -0,0 +1,42 @@
from datetime import datetime
import csv
import io
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.repositories.audio_repository import AudioRepository
router = APIRouter()
@router.get("/csv")
async def export_csv(
time_from: datetime = Query(..., alias="from"),
time_to: datetime = Query(..., alias="to"),
db: AsyncSession = Depends(get_db),
):
if time_from >= time_to:
raise HTTPException(
status_code=400,
detail="'from' timestamp must be earlier than 'to' timestamp",
)
repo = AudioRepository(db)
rows = await repo.range(time_from, time_to)
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["time", "rms_db", "frequency_hz", "is_silence"])
for r in rows:
w.writerow([r.time.isoformat(), r.rms_db, r.frequency_hz, r.is_silence])
buf.seek(0)
filename = f"audio_{time_from:%Y%m%d_%H%M%S}_to_{time_to:%Y%m%d_%H%M%S}.csv"
return StreamingResponse(
iter([buf.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"},
)

View File

@@ -0,0 +1,19 @@
from fastapi import APIRouter, Depends
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.schemas.base import ApiResponse
router = APIRouter()
@router.get("/live", response_model=ApiResponse[dict])
async def live() -> ApiResponse[dict]:
return ApiResponse(data={"status": "alive"})
@router.get("/ready", response_model=ApiResponse[dict])
async def ready(db: AsyncSession = Depends(get_db)) -> ApiResponse[dict]:
await db.execute(text("SELECT 1"))
return ApiResponse(data={"status": "ready", "db": "ok"})

View File

@@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import get_db
from app.repositories.audio_repository import AudioRepository
from app.schemas.base import ApiResponse
from app.schemas.stats import StatsSummary
from app.services.stats_service import StatsService
router = APIRouter()
@router.get("/summary", response_model=ApiResponse[StatsSummary])
async def summary(
period: str = Query("1h", pattern="^(10s|1m|1h|6h|24h|7d|30d)$"),
db: AsyncSession = Depends(get_db),
):
service = StatsService(AudioRepository(db))
try:
data = await service.summary(period)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return ApiResponse(data=data)

View File

@@ -0,0 +1,9 @@
from fastapi import APIRouter
from app.api.v1.endpoints import audio, stats, events, export, health
router = APIRouter()
router.include_router(health.router, prefix="/health", tags=["health"])
router.include_router(audio.router, prefix="/audio", tags=["audio"])
router.include_router(stats.router, prefix="/stats", tags=["stats"])
router.include_router(events.router, prefix="/events", tags=["events"])
router.include_router(export.router, prefix="/export", tags=["export"])

View File

@@ -0,0 +1,11 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
DATABASE_URL: str = "postgresql+asyncpg://postgres:postgres@db:5432/audio_analyzer"
API_V1_PREFIX: str = "/api/v1"
settings = Settings()

View File

@@ -0,0 +1,5 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

View File

@@ -0,0 +1,12 @@
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.core.config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=False, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, autoflush=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session

46
services/api/app/main.py Normal file
View File

@@ -0,0 +1,46 @@
from fastapi import FastAPI
import asyncio
from contextlib import asynccontextmanager, suppress
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.v1.router import router as v1_router
from app.core.config import settings
from app.ws.router import router as ws_router
from app.ws.broadcaster import audio_live_broadcaster
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(audio_live_broadcaster())
try:
yield
finally:
task.cancel()
with suppress(asyncio.CancelledError):
await task
def create_app() -> FastAPI:
app = FastAPI(
title="Audio Analyzer API",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(v1_router, prefix=settings.API_V1_PREFIX)
app.include_router(ws_router) # /ws/live
return app
app = create_app()

View File

@@ -0,0 +1,14 @@
from sqlalchemy import Boolean, Integer, Float
from sqlalchemy.dialects.postgresql import TIMESTAMP
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base
class AudioData(Base):
__tablename__ = "audio_data"
time: Mapped[object] = mapped_column(TIMESTAMP(timezone=True), primary_key=True)
rms_db: Mapped[float] = mapped_column(Float, nullable=False)
frequency_hz: Mapped[int] = mapped_column(Integer, nullable=False)
is_silence: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
from datetime import datetime, timedelta
from sqlalchemy import and_, func, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audio_data import AudioData
class AudioRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def latest(self, limit: int) -> list[AudioData]:
q = select(AudioData).order_by(AudioData.time.desc()).limit(limit)
res = await self.db.execute(q)
return list(res.scalars().all())
async def range(self, time_from: datetime, time_to: datetime) -> list[AudioData]:
q = (
select(AudioData)
.where(and_(AudioData.time >= time_from, AudioData.time <= time_to))
.order_by(AudioData.time.asc())
)
res = await self.db.execute(q)
return list(res.scalars().all())
async def loud_samples(
self,
threshold: float,
time_from: datetime | None,
time_to: datetime | None,
) -> list[AudioData]:
cond = [AudioData.rms_db >= threshold]
if time_from:
cond.append(AudioData.time >= time_from)
if time_to:
cond.append(AudioData.time <= time_to)
q = select(AudioData).where(and_(*cond)).order_by(AudioData.time.asc())
res = await self.db.execute(q)
return list(res.scalars().all())
async def summary_since(self, since: datetime) -> dict:
q = select(
func.avg(AudioData.rms_db).label("avg_db"),
func.max(AudioData.rms_db).label("max_db"),
func.sum(func.case((AudioData.is_silence.is_(True), 1), else_=0)).label(
"silence_count"
),
func.count().label("total_count"),
).where(AudioData.time >= since)
res = await self.db.execute(q)
row = res.one()
# dominant freq excluding silence
fq = (
select(AudioData.frequency_hz, func.count().label("cnt"))
.where(and_(AudioData.time >= since, AudioData.is_silence.is_(False)))
.group_by(AudioData.frequency_hz)
.order_by(text("cnt DESC"))
.limit(1)
)
fres = await self.db.execute(fq)
frow = fres.first()
return {
"avg_db": float(row.avg_db or 0.0),
"max_db": float(row.max_db or 0.0),
"dominant_freq": int(frow[0]) if frow else 0,
"silence_count": int(row.silence_count or 0),
"total_count": int(row.total_count or 0),
}

View File

@@ -0,0 +1,9 @@
from datetime import datetime
from pydantic import BaseModel
class AudioPoint(BaseModel):
time: datetime
rms_db: float
frequency_hz: int
is_silence: bool

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from typing import Generic, TypeVar
from pydantic import BaseModel, Field
T = TypeVar("T")
class ApiError(BaseModel):
code: str = Field(..., examples=["validation_error", "db_error"])
message: str
details: dict | None = None
class ApiResponse(BaseModel, Generic[T]):
success: bool = True
errors: list[ApiError] | None = None
count: int | None = None
data: T | None = None

View File

@@ -0,0 +1,9 @@
from datetime import datetime
from pydantic import BaseModel
class LoudEvent(BaseModel):
time: datetime
rms_db: float
frequency_hz: int
duration_sec: float | None = None

View File

@@ -0,0 +1,8 @@
from pydantic import BaseModel
class StatsSummary(BaseModel):
avg_db: float
max_db: float
dominant_freq: int
silence_percent: float

View File

@@ -0,0 +1,16 @@
from datetime import datetime
from app.repositories.audio_repository import AudioRepository
from app.schemas.audio import AudioPoint
class AudioService:
def __init__(self, repo: AudioRepository):
self.repo = repo
async def latest(self, limit: int) -> list[AudioPoint]:
rows = await self.repo.latest(limit)
return [AudioPoint.model_validate(r, from_attributes=True) for r in rows]
async def range(self, time_from: datetime, time_to: datetime) -> list[AudioPoint]:
rows = await self.repo.range(time_from, time_to)
return [AudioPoint.model_validate(r, from_attributes=True) for r in rows]

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from datetime import datetime
from app.repositories.audio_repository import AudioRepository
from app.schemas.events import LoudEvent
class EventsService:
def __init__(self, repo: AudioRepository):
self.repo = repo
async def loud_events(
self,
threshold: float,
time_from: datetime | None,
time_to: datetime | None,
max_gap_sec: float = 1.0,
) -> list[LoudEvent]:
samples = await self.repo.loud_samples(threshold, time_from, time_to)
if not samples:
return []
events: list[LoudEvent] = []
start = samples[0].time
end = samples[0].time
max_db = samples[0].rms_db
freq = samples[0].frequency_hz
for s in samples[1:]:
gap = (s.time - end).total_seconds()
if gap <= max_gap_sec:
end = s.time
if s.rms_db > max_db:
max_db = s.rms_db
else:
events.append(
LoudEvent(
time=start,
rms_db=round(float(max_db), 2),
frequency_hz=int(freq),
duration_sec=round((end - start).total_seconds(), 2),
)
)
start, end, max_db, freq = s.time, s.time, s.rms_db, s.frequency_hz
events.append(
LoudEvent(
time=start,
rms_db=round(float(max_db), 2),
frequency_hz=int(freq),
duration_sec=round((end - start).total_seconds(), 2),
)
)
return events

View File

@@ -0,0 +1,35 @@
from datetime import datetime, timedelta
from app.repositories.audio_repository import AudioRepository
from app.schemas.stats import StatsSummary
_PERIODS = {
"10s": timedelta(seconds=10),
"1m": timedelta(minutes=1),
"1h": timedelta(hours=1),
"6h": timedelta(hours=6),
"24h": timedelta(hours=24),
"7d": timedelta(days=7),
"30d": timedelta(days=30),
}
class StatsService:
def __init__(self, repo: AudioRepository):
self.repo = repo
async def summary(self, period: str) -> StatsSummary:
if period not in _PERIODS:
raise ValueError(f"Unsupported period: {period}")
since = datetime.utcnow() - _PERIODS[period]
raw = await self.repo.summary_since(since)
total = raw["total_count"]
silence_percent = (raw["silence_count"] / total * 100.0) if total else 0.0
return StatsSummary(
avg_db=round(raw["avg_db"], 2),
max_db=round(raw["max_db"], 2),
dominant_freq=raw["dominant_freq"],
silence_percent=round(silence_percent, 2),
)

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import SessionLocal
from app.repositories.audio_repository import AudioRepository
from app.ws.manager import manager
def _iso_z(dt) -> str:
# dt ожидается timezone-aware
return dt.astimezone(timezone.utc).isoformat().replace("+00:00", "Z")
async def audio_live_broadcaster(poll_interval_sec: float = 0.2) -> None:
last_time = None
while True:
try:
async with SessionLocal() as db: # AsyncSession
repo = AudioRepository(db)
rows = await repo.latest(1)
if rows:
row = rows[0]
if last_time is None or row.time > last_time:
last_time = row.time
await manager.broadcast_json(
{
"time": _iso_z(row.time),
"rms_db": float(row.rms_db),
"freq_hz": int(row.frequency_hz),
}
)
except Exception:
# чтобы WS не умирал из-за временных проблем с БД
pass
await asyncio.sleep(poll_interval_sec)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import asyncio
from typing import Any
from fastapi import WebSocket
class ConnectionManager:
def __init__(self) -> None:
self._connections: set[WebSocket] = set()
self._lock = asyncio.Lock()
async def connect(self, ws: WebSocket) -> None:
await ws.accept()
async with self._lock:
self._connections.add(ws)
async def disconnect(self, ws: WebSocket) -> None:
async with self._lock:
self._connections.discard(ws)
async def broadcast_json(self, payload: dict[str, Any]) -> None:
async with self._lock:
conns = list(self._connections)
to_remove: list[WebSocket] = []
for ws in conns:
try:
await ws.send_json(payload)
except Exception:
to_remove.append(ws)
if to_remove:
async with self._lock:
for ws in to_remove:
self._connections.discard(ws)
manager = ConnectionManager()

View File

@@ -0,0 +1,19 @@
from fastapi import APIRouter, WebSocket
from starlette.websockets import WebSocketDisconnect
from app.ws.manager import manager
router = APIRouter()
@router.websocket("/ws/live")
async def ws_live(ws: WebSocket) -> None:
await manager.connect(ws)
try:
# Держим соединение
while True:
await ws.receive_text()
except WebSocketDisconnect:
await manager.disconnect(ws)
except Exception:
await manager.disconnect(ws)

View File

@@ -0,0 +1,15 @@
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -0,0 +1,8 @@
fastapi
uvicorn[standard]
sqlalchemy
asyncpg
pydantic
pydantic-settings
python-dateutil
websockets

176
services/collector/.gitignore vendored Normal file
View File

@@ -0,0 +1,176 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### Python Patch ###
# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
poetry.toml
# ruff
.ruff_cache/
# LSP config files
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python

View File

@@ -0,0 +1,19 @@
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
--no-install-recommends \
gcc \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY *.py ./
# Run as non-root user
RUN useradd -m -u 1000 collector && chown -R collector:collector /app
USER collector
CMD ["python", "main.py"]

View File

View File

@@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
FR-2.2: Audio Data Validation
Validates audio metrics against expected ranges
"""
from typing import NamedTuple
class ValidationResult(NamedTuple):
"""Validation result"""
valid: bool
error: str = ""
class AudioValidator:
"""
Validates audio metrics against hardware constraints and realistic ranges.
"""
# Hardware constraints (from FR spec)
RMS_MIN_DB = -40.0 # Noise floor
RMS_MAX_DB = 80.0 # Clipping threshold
FREQ_MIN_HZ = 100 # Below = unreliable (FFT bin size ~43Hz)
FREQ_MAX_HZ = 8000 # Nyquist @ 22.05kHz with safety margin
# Extended ranges for detection (not storage)
FREQ_MIN_EXTENDED_HZ = 20
FREQ_MAX_EXTENDED_HZ = 11000
@staticmethod
def validate_rms(rms_db: float) -> ValidationResult:
"""
Validate RMS value.
Args:
rms_db: RMS in dB
Returns:
ValidationResult with valid flag and error message
"""
if not isinstance(rms_db, (int, float)):
return ValidationResult(False, "RMS must be numeric")
if rms_db < AudioValidator.RMS_MIN_DB:
return ValidationResult(
False, f"RMS {rms_db:.1f}dB below minimum {AudioValidator.RMS_MIN_DB}dB"
)
if rms_db > AudioValidator.RMS_MAX_DB:
return ValidationResult(
False,
f"RMS {rms_db:.1f}dB exceeds maximum {AudioValidator.RMS_MAX_DB}dB",
)
return ValidationResult(True)
@staticmethod
def validate_frequency(freq_hz: int, strict: bool = True) -> ValidationResult:
"""
Validate frequency value.
Args:
freq_hz: Frequency in Hz
strict: If True, use tight range (100-8000Hz), else extended (20-11000Hz)
Returns:
ValidationResult with valid flag and error message
"""
if not isinstance(freq_hz, int):
return ValidationResult(False, "Frequency must be integer")
if strict:
min_hz = AudioValidator.FREQ_MIN_HZ
max_hz = AudioValidator.FREQ_MAX_HZ
else:
min_hz = AudioValidator.FREQ_MIN_EXTENDED_HZ
max_hz = AudioValidator.FREQ_MAX_EXTENDED_HZ
if freq_hz < min_hz:
return ValidationResult(
False, f"Frequency {freq_hz}Hz below minimum {min_hz}Hz"
)
if freq_hz > max_hz:
return ValidationResult(
False, f"Frequency {freq_hz}Hz exceeds maximum {max_hz}Hz"
)
return ValidationResult(True)
@staticmethod
def validate_packet(rms_db: float, freq_hz: int) -> ValidationResult:
"""
Validate complete audio packet.
Args:
rms_db: RMS in dB
freq_hz: Frequency in Hz
Returns:
ValidationResult with valid flag and error message
"""
rms_result = AudioValidator.validate_rms(rms_db)
if not rms_result.valid:
return rms_result
freq_result = AudioValidator.validate_frequency(freq_hz, strict=True)
if not freq_result.valid:
return freq_result
return ValidationResult(True)

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""
FR-2.3: Database Writer with Batch Processing
Buffers audio metrics and writes in batches (50 records or 5 seconds)
"""
import asyncio
import logging
from datetime import datetime, timezone
from typing import List, Optional
from dataclasses import dataclass
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class AudioRecord:
"""Single audio measurement record"""
timestamp: datetime
rms_db: float
freq_hz: int
is_silence: bool
class DatabaseWriter:
"""
Batched database writer for audio metrics.
Flushes on: 50 records OR 5 seconds timeout
"""
BATCH_SIZE = 50
BATCH_TIMEOUT = 5.0 # seconds
SILENCE_THRESHOLD_DB = -30.0 # dB below = silence
def __init__(self, db_url: str):
self.db_url = db_url
self.pool: Optional[asyncpg.Pool] = None
self.buffer: List[AudioRecord] = []
self.last_flush_time = asyncio.get_event_loop().time()
self._flush_task: Optional[asyncio.Task] = None
self._running = False
async def connect(self):
"""Establish database connection pool"""
try:
self.pool = await asyncpg.create_pool(
self.db_url, min_size=2, max_size=5, command_timeout=10.0
)
logger.info("Database connection pool established")
# Test connection
async with self.pool.acquire() as conn:
await conn.fetchval("SELECT 1")
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
raise
async def close(self):
"""Close database connection and flush remaining data"""
self._running = False
if self._flush_task and not self._flush_task.done():
self._flush_task.cancel()
try:
await self._flush_task
except asyncio.CancelledError:
pass
await self.flush()
if self.pool:
await self.pool.close()
logger.info("Database connection closed")
async def start_auto_flush(self):
"""Start background task for timeout-based flushing"""
self._running = True
self._flush_task = asyncio.create_task(self._auto_flush_loop())
async def _auto_flush_loop(self):
"""Background task: flush buffer every BATCH_TIMEOUT seconds"""
while self._running:
try:
await asyncio.sleep(self.BATCH_TIMEOUT)
current_time = asyncio.get_event_loop().time()
if self.buffer and (
current_time - self.last_flush_time >= self.BATCH_TIMEOUT
):
await self.flush()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in auto-flush loop: {e}")
async def add_record(self, timestamp_ms: int, rms_db: float, freq_hz: int):
"""
Add single record to buffer. Flushes if batch size reached.
Args:
timestamp_ms: MCU timestamp in milliseconds
rms_db: RMS value in dB
freq_hz: Dominant frequency in Hz
"""
# Convert timestamp to datetime (use current time with ms offset)
# Note: MCU timestamp wraps around, use server time for absolute reference
timestamp = datetime.now(timezone.utc)
# Detect silence
is_silence = rms_db < self.SILENCE_THRESHOLD_DB
record = AudioRecord(
timestamp=timestamp, rms_db=rms_db, freq_hz=freq_hz, is_silence=is_silence
)
self.buffer.append(record)
logger.debug(
f"Buffered: rms={rms_db:.1f}dB freq={freq_hz}Hz "
f"silence={is_silence} (buffer={len(self.buffer)})"
)
# Flush if batch size reached
if len(self.buffer) >= self.BATCH_SIZE:
await self.flush()
async def flush(self):
"""Write all buffered records to database"""
if not self.buffer:
return
if not self.pool:
logger.error("Database pool not initialized, cannot flush")
return
records_to_write = self.buffer[:]
self.buffer.clear()
self.last_flush_time = asyncio.get_event_loop().time()
try:
async with self.pool.acquire() as conn:
# Prepare batch insert
records = [
(r.timestamp, r.rms_db, r.freq_hz, r.is_silence)
for r in records_to_write
]
await conn.executemany(
"""
INSERT INTO audio_data (time, rms_db, frequency_hz, is_silence)
VALUES ($1, $2, $3, $4)
""",
records,
)
logger.info(f"Flushed {len(records)} records to database")
except Exception as e:
logger.error(f"Failed to write batch to database: {e}")
# Re-add to buffer for retry (optional, could cause memory issues)
# self.buffer.extend(records_to_write)

144
services/collector/main.py Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""
FR-2: Audio Data Collector Service
Reads audio metrics from STM32, validates, and writes to TimescaleDB
"""
import asyncio
import logging
import os
import signal
import sys
from serial_reader import SerialReader
from audio_validator import AudioValidator
from db_writer import DatabaseWriter
from protocol_parser import AudioMetrics
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class CollectorService:
"""Main collector service orchestrating serial reading and database writing"""
def __init__(self, serial_port: str, db_url: str, baudrate: int = 115200):
self.serial_reader = SerialReader(
port=serial_port, baudrate=baudrate, on_packet=self._handle_packet
)
self.db_writer = DatabaseWriter(db_url=db_url)
self.validator = AudioValidator()
self._shutdown_event = asyncio.Event()
async def _handle_packet(self, packet: AudioMetrics):
"""
Process received audio packet: validate and write to database.
Args:
packet: Parsed audio metrics packet
"""
# Validate packet
validation = self.validator.validate_packet(packet.rms_db, packet.freq_hz)
if not validation.valid:
logger.warning(
f"Invalid packet: {validation.error} "
f"(rms={packet.rms_db:.1f}dB freq={packet.freq_hz}Hz)"
)
return
# Write to database
try:
await self.db_writer.add_record(
timestamp_ms=packet.timestamp_ms,
rms_db=packet.rms_db,
freq_hz=packet.freq_hz,
)
except Exception as e:
logger.error(f"Failed to add record to database: {e}")
async def start(self):
"""Start collector service"""
logger.info("Starting Audio Data Collector Service")
try:
# Connect to database
await self.db_writer.connect()
await self.db_writer.start_auto_flush()
# Connect to serial port
await self.serial_reader.connect()
await self.serial_reader.start_reading()
logger.info("Service started successfully")
# Wait for shutdown signal
await self._shutdown_event.wait()
except Exception as e:
logger.error(f"Service startup failed: {e}")
raise
finally:
await self.stop()
async def stop(self):
"""Stop collector service gracefully"""
logger.info("Stopping Audio Data Collector Service")
# Disconnect serial reader
await self.serial_reader.disconnect()
# Close database writer (flushes remaining data)
await self.db_writer.close()
logger.info("Service stopped")
def shutdown(self):
"""Trigger graceful shutdown"""
logger.info("Shutdown requested")
self._shutdown_event.set()
def main():
"""Main entry point"""
# Read configuration from environment
SERIAL_PORT = os.getenv("SERIAL_PORT", "/dev/ttyACM0")
BAUDRATE = int(os.getenv("BAUDRATE", "115200"))
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME", "audio_analyzer")
DB_USER = os.getenv("DB_USER", "postgres")
DB_PASSWORD = os.getenv("DB_PASSWORD", "postgres")
db_url = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
# Create service
service = CollectorService(
serial_port=SERIAL_PORT, db_url=db_url, baudrate=BAUDRATE
)
# Setup signal handlers for graceful shutdown
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, service.shutdown)
try:
# Run service
loop.run_until_complete(service.start())
except KeyboardInterrupt:
logger.info("Interrupted by user")
except Exception as e:
logger.error(f"Service error: {e}")
sys.exit(1)
finally:
loop.close()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
import argparse
import time
import serial
from protocol_parser import ProtocolParser
def main():
ap = argparse.ArgumentParser(description="FR-1.4 binary stream monitor")
ap.add_argument("--port", default="/dev/ttyACM0")
ap.add_argument("--baud", type=int, default=115200)
ap.add_argument("--timeout", type=float, default=0.2)
args = ap.parse_args()
parser = ProtocolParser()
with serial.Serial(args.port, args.baud, timeout=args.timeout) as ser:
while True:
data = ser.read(ser.in_waiting or 1)
if not data:
continue
packets = parser.feed(data)
st = parser.get_stats()
for pkt in packets:
# Одна строка на пакет + счётчик CRC ошибок
print(
f"{pkt.timestamp_ms:010d} "
f"rms_db={pkt.rms_db:+6.1f} "
f"freq_hz={pkt.freq_hz:4d} "
f"crc_err={st.crc_errors}"
)
time.sleep(0.001)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
FR-1.4 Data Transmission Protocol Parser (v1)
12-byte binary packet: [0xAA][TYPE=0x02][LEN=8][TIMESTAMP(4)][RMS_DB(2)][FREQ_HZ(2)][CRC8(1)]
"""
import struct
from typing import Optional, NamedTuple
from dataclasses import dataclass
class AudioMetrics(NamedTuple):
"""Parsed audio metrics packet"""
timestamp_ms: int
rms_db: float
freq_hz: int
valid: bool = True
@dataclass
class ProtocolStats:
"""Protocol statistics"""
packets_received: int = 0
crc_errors: int = 0
length_errors: int = 0
range_errors: int = 0
class ProtocolParser:
"""
Stream parser for FR-1.4 protocol with automatic resynchronization.
"""
SOF = 0xAA
TYPE_AUDIO_V1 = 0x02
PAYLOAD_LEN = 0x08
PACKET_SIZE = 12
def __init__(self):
self.buffer = bytearray()
self.stats = ProtocolStats()
@staticmethod
def _crc8_atm(data: bytes) -> int:
"""CRC-8/ATM: poly=0x07, init=0x00, refin=false, refout=false, xorout=0x00"""
crc = 0x00
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = ((crc << 1) ^ 0x07) & 0xFF
else:
crc = (crc << 1) & 0xFF
return crc
def feed(self, data: bytes) -> list[AudioMetrics]:
"""
Feed incoming bytes, return list of parsed packets.
Args:
data: Raw bytes from serial port
Returns:
List of successfully parsed AudioMetrics
"""
self.buffer.extend(data)
packets = []
while len(self.buffer) >= self.PACKET_SIZE:
# Find SOF
sof_idx = self.buffer.find(self.SOF)
if sof_idx == -1:
# No SOF found, discard all but last byte
self.buffer = self.buffer[-1:]
break
# Discard bytes before SOF
if sof_idx > 0:
self.buffer = self.buffer[sof_idx:]
# Need at least 3 bytes for SOF + TYPE + LEN
if len(self.buffer) < 3:
break
packet_type = self.buffer[1]
payload_len = self.buffer[2]
# Validate TYPE and LEN
if packet_type != self.TYPE_AUDIO_V1 or payload_len != self.PAYLOAD_LEN:
self.stats.length_errors += 1
self.buffer.pop(0) # Remove false SOF, retry
continue
# Full packet size = SOF(1) + TYPE(1) + LEN(1) + PAYLOAD(8) + CRC(1) = 12
total_len = 3 + payload_len + 1
if len(self.buffer) < total_len:
break # Wait for more data
packet = bytes(self.buffer[:total_len])
# Verify CRC (over bytes 1..10: TYPE, LEN, payload)
crc_data = packet[1:11]
expected_crc = packet[11]
calculated_crc = self._crc8_atm(crc_data)
if calculated_crc != expected_crc:
self.stats.crc_errors += 1
self.buffer.pop(0) # Remove bad packet, retry
continue
# Parse payload (little-endian)
timestamp_ms, rms_db_x10, freq_hz = struct.unpack_from('<IhH', packet, 3)
# Convert and validate ranges
rms_db = rms_db_x10 / 10.0
valid = True
if not (-40.0 <= rms_db <= 80.0) or not (100 <= freq_hz <= 8000):
self.stats.range_errors += 1
valid = False
self.stats.packets_received += 1
packets.append(AudioMetrics(
timestamp_ms=timestamp_ms,
rms_db=rms_db,
freq_hz=freq_hz,
valid=valid
))
# Remove processed packet
self.buffer = self.buffer[total_len:]
return packets
def get_stats(self) -> ProtocolStats:
"""Get current statistics"""
return self.stats

View File

@@ -0,0 +1,56 @@
#!/usr/bin/env python3
"""
Example client for FR-1.4 protocol
"""
import serial
import time
from protocol_parser import ProtocolParser
def main():
SERIAL_PORT = "/dev/ttyACM0"
BAUDRATE = 115200
parser = ProtocolParser()
try:
with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) as ser:
print(f"Connected to {SERIAL_PORT}")
while True:
# Read available data
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
# Parse packets
packets = parser.feed(data)
for pkt in packets:
if pkt.valid:
print(
f"[{pkt.timestamp_ms:010d}] "
f"RMS: {pkt.rms_db:+6.1f} dB "
f"Freq: {pkt.freq_hz:4d} Hz"
)
else:
print(f"[WARN] Invalid packet: {pkt}")
# Show stats every 30 packets
stats = parser.get_stats()
if stats.packets_received % 30 == 0 and stats.packets_received > 0:
print(
f"Stats: RX={stats.packets_received} "
f"CRC_err={stats.crc_errors} "
f"LEN_err={stats.length_errors} "
f"Range_err={stats.range_errors}"
)
time.sleep(0.01)
except KeyboardInterrupt:
print("\nExiting...")
except serial.SerialException as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,5 @@
pyserial
asyncpg
numpy
pytest
pytest-asyncio

View File

@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
FR-2.1: Asynchronous Serial Reader
Reads binary packets from STM32 via USB CDC
"""
import asyncio
import logging
from typing import Callable, Awaitable
import serial
from serial import SerialException
from protocol_parser import ProtocolParser, AudioMetrics
logger = logging.getLogger(__name__)
class SerialReader:
"""
Asynchronous serial port reader with protocol parsing.
"""
def __init__(
self,
port: str,
baudrate: int = 115200,
on_packet: Callable[[AudioMetrics], Awaitable[None]] = None,
):
"""
Initialize serial reader.
Args:
port: Serial port path (e.g., /dev/ttyACM0)
baudrate: Serial baudrate (default 115200)
on_packet: Async callback for each received packet
"""
self.port = port
self.baudrate = baudrate
self.on_packet = on_packet
self.serial: serial.Serial = None
self.parser = ProtocolParser()
self._running = False
self._read_task: asyncio.Task = None
async def connect(self):
"""Open serial port connection"""
try:
self.serial = serial.Serial(
self.port, self.baudrate, timeout=0.1, write_timeout=1.0
)
logger.info(f"Connected to {self.port} @ {self.baudrate} baud")
except SerialException as e:
logger.error(f"Failed to open serial port {self.port}: {e}")
raise
async def disconnect(self):
"""Close serial port connection"""
self._running = False
if self._read_task and not self._read_task.done():
self._read_task.cancel()
try:
await self._read_task
except asyncio.CancelledError:
pass
if self.serial and self.serial.is_open:
self.serial.close()
logger.info("Serial port closed")
async def start_reading(self):
"""Start background task for reading serial data"""
self._running = True
self._read_task = asyncio.create_task(self._read_loop())
async def _read_loop(self):
"""Background task: continuously read and parse serial data"""
while self._running:
try:
# Read available data (non-blocking due to timeout)
if self.serial.in_waiting > 0:
data = self.serial.read(self.serial.in_waiting or 1)
if data:
# Parse packets
packets = self.parser.feed(data)
# Process each packet
for packet in packets:
if self.on_packet:
try:
await self.on_packet(packet)
except Exception as e:
logger.error(f"Error in packet handler: {e}")
# Log statistics periodically
stats = self.parser.get_stats()
if stats.packets_received % 100 == 0:
logger.info(
f"Stats: RX={stats.packets_received} "
f"CRC_err={stats.crc_errors} "
f"LEN_err={stats.length_errors} "
f"Range_err={stats.range_errors}"
)
# Small delay to prevent CPU spinning
await asyncio.sleep(0.01)
except SerialException as e:
logger.error(f"Serial read error: {e}")
self._running = False
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Unexpected error in read loop: {e}")
def get_stats(self):
"""Get protocol parser statistics"""
return self.parser.get_stats()

View File

@@ -0,0 +1,81 @@
import struct
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from protocol_parser import ProtocolParser
def build_packet(timestamp_ms: int, rms_db_x10: int, freq_hz: int) -> bytes:
sof = bytes([ProtocolParser.SOF])
header = bytes([ProtocolParser.TYPE_AUDIO_V1, ProtocolParser.PAYLOAD_LEN])
payload = struct.pack("<IhH", timestamp_ms, rms_db_x10, freq_hz)
crc_data = header + payload # bytes 1..10 in the wire format
crc = ProtocolParser._crc8_atm(crc_data)
return sof + header + payload + bytes([crc])
def test_valid_packet():
p = ProtocolParser()
raw = build_packet(timestamp_ms=1234, rms_db_x10=-123, freq_hz=440)
packets = p.feed(raw)
assert len(packets) == 1
pkt = packets[0]
assert pkt.valid is True
assert pkt.timestamp_ms == 1234
assert pkt.rms_db == -12.3
assert pkt.freq_hz == 440
st = p.get_stats()
assert st.packets_received == 1
assert st.crc_errors == 0
def test_bad_crc_packet():
p = ProtocolParser()
raw = bytearray(build_packet(timestamp_ms=1, rms_db_x10=-10, freq_hz=1000))
raw[-1] ^= 0xFF # ломаем CRC
packets = p.feed(bytes(raw))
assert packets == []
st = p.get_stats()
assert st.packets_received == 0
assert st.crc_errors == 1
def test_garbage_then_valid_packet_resync():
p = ProtocolParser()
garbage = b"\x00\xff\xaa\x01\x02\x03\x04" # содержит ложный SOF и мусор
raw = garbage + build_packet(timestamp_ms=777, rms_db_x10=-321, freq_hz=1234)
packets = p.feed(raw)
assert len(packets) == 1
assert packets[0].timestamp_ms == 777
assert packets[0].rms_db == -32.1
assert packets[0].freq_hz == 1234
assert p.get_stats().length_errors >= 1 # парсер должен "проглотить" мусор
def test_two_packets_in_one_chunk():
p = ProtocolParser()
raw = build_packet(timestamp_ms=10, rms_db_x10=-100, freq_hz=500) + build_packet(
timestamp_ms=20, rms_db_x10=-200, freq_hz=600
)
packets = p.feed(raw)
assert len(packets) == 2
assert packets[0].timestamp_ms == 10
assert packets[1].timestamp_ms == 20

View File

@@ -1,40 +0,0 @@
#!/usr/bin/env python3
import threading
import time
import serial
PORT = "/dev/ttyACM0"
BAUDRATE = 115200
def read_from_port(ser):
while True:
if ser.in_waiting > 0:
data = ser.read(ser.in_waiting)
print(f"Received: {data.decode('utf-8', errors='ignore')}")
try:
ser = serial.Serial(PORT, BAUDRATE, timeout=1)
print(f"Connected to {PORT}")
# Запускаем чтение в фоне
thread = threading.Thread(target=read_from_port, args=(ser,), daemon=True)
thread.start()
# Пишем данные
counter = 0
while True:
msg = f"Ping {counter}\n"
ser.write(msg.encode("utf-8"))
print(f"Sent: {msg.strip()}")
counter += 1
time.sleep(1)
except serial.SerialException as e:
print(f"Error: {e}")
except KeyboardInterrupt:
print("\nExiting...")
if "ser" in locals() and ser.is_open:
ser.close()