227 lines
6 KiB
Python
227 lines
6 KiB
Python
# /// script
|
|
# requires-python = ">=3.13"
|
|
# dependencies = [
|
|
# "matplotlib",
|
|
# "numpy",
|
|
# "pyserial",
|
|
# ]
|
|
# ///
|
|
|
|
from struct import Struct
|
|
from threading import Thread
|
|
from typing import Iterable
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from matplotlib.animation import FuncAnimation
|
|
from serial import Serial
|
|
|
|
SYNC = bytes.fromhex("001180007fff1100")
|
|
BLOCK_LENGTH = 512
|
|
BANDS_COUNT = 3
|
|
|
|
BlockStruct = Struct(f"I{BANDS_COUNT}f{BANDS_COUNT}f{BLOCK_LENGTH}f")
|
|
|
|
data: list[bytes] = []
|
|
|
|
|
|
do_recv = True
|
|
recv_thread = None
|
|
|
|
|
|
def recv_main():
|
|
ser = Serial("COM5", baudrate=1000000)
|
|
|
|
def sync():
|
|
syncbuf = bytearray(ser.read(len(SYNC)))
|
|
if syncbuf[: len(SYNC)] == SYNC:
|
|
return
|
|
print("not in sync...")
|
|
synced = False
|
|
while not synced:
|
|
if syncbuf[: len(SYNC)] == SYNC:
|
|
synced = True
|
|
break
|
|
syncbuf.append(ser.read(1)[0])
|
|
syncbuf.pop(0)
|
|
if not synced:
|
|
raise ConnectionError
|
|
print("synced")
|
|
|
|
while do_recv:
|
|
sync()
|
|
data.append(ser.read(BlockStruct.size))
|
|
|
|
|
|
class Graph:
|
|
def __init__(
|
|
self,
|
|
ax,
|
|
length: int,
|
|
limits: tuple[float, float],
|
|
offset: float = 0,
|
|
scale: float = 1,
|
|
) -> None:
|
|
self.x = np.arange(length)
|
|
self.y = np.zeros(length)
|
|
|
|
self.offset = offset
|
|
self.scale = scale
|
|
|
|
self.ax = ax
|
|
self.line = self.ax.plot(self.x, self.y)[0]
|
|
self.ax.set_ylim(*limits)
|
|
self.updating = True
|
|
|
|
def update(self, new_data: Iterable[float], prescaled: bool = False) -> None:
|
|
self.y[:] = new_data
|
|
if not prescaled:
|
|
self.y *= self.scale
|
|
self.y += self.offset
|
|
if self.updating:
|
|
self.line.set_ydata(self.y)
|
|
|
|
|
|
class RollingGraph(Graph):
|
|
def update(self, new_data: float, prescaled: bool = False):
|
|
data = np.roll(self.y, -1)
|
|
data[-1] = new_data
|
|
if not prescaled:
|
|
data[-1] *= self.scale
|
|
data[-1] += self.offset
|
|
return super().update(data, prescaled=True)
|
|
|
|
|
|
def main() -> None:
|
|
global recv_thread
|
|
|
|
fig, ax = plt.subplots(5)
|
|
fig.canvas.mpl_connect("close_event", on_close)
|
|
|
|
offset = 0
|
|
scale = (0, 1)
|
|
bass_scale = (0, 1)
|
|
mid_scale = (0, 0.5)
|
|
treble_scale = (0, 0.3)
|
|
|
|
graphs: dict[Graph] = {
|
|
"fft": Graph(ax[0], BLOCK_LENGTH, scale, offset),
|
|
"floating_max": RollingGraph(ax[1], 200, (-2, 2)),
|
|
"bass": RollingGraph(ax[2], 200, bass_scale, offset),
|
|
"mid": RollingGraph(ax[3], 200, mid_scale, offset),
|
|
"treble": RollingGraph(ax[4], 200, treble_scale, offset),
|
|
"bass_avg": RollingGraph(ax[2], 200, bass_scale, offset),
|
|
"mid_avg": RollingGraph(ax[3], 200, mid_scale, offset),
|
|
"treble_avg": RollingGraph(ax[4], 200, treble_scale, offset),
|
|
"bass_mark": RollingGraph(ax[2], 200, bass_scale, offset),
|
|
"mid_mark": RollingGraph(ax[3], 200, mid_scale, offset),
|
|
"treble_mark": RollingGraph(ax[4], 200, treble_scale, offset),
|
|
}
|
|
|
|
# graph1 = ax[0].plot(x1, y1, label="fft")[0]
|
|
# graph2 = ax[1].plot(x2, y2, label="floating max")[0]
|
|
# graph3 = ax[2].bar(x3, y3, label="avg")
|
|
# graph4 = ax[2].bar(x4, y4, label="cur")
|
|
|
|
# ax[0].set_ylim(0, 100)
|
|
# ax[1].set_ylim(-2, 2)
|
|
# ax[2].set_ylim(0, 100)
|
|
|
|
np.set_printoptions(suppress=True, precision=2)
|
|
|
|
recv_thread = Thread(target=recv_main)
|
|
|
|
recv_thread.start()
|
|
|
|
graphs["fft"].updating = True
|
|
graphs["floating_max"].updating = True
|
|
graphs["bass"].updating = True
|
|
graphs["mid"].updating = True
|
|
graphs["treble"].updating = True
|
|
graphs["bass_avg"].updating = True
|
|
graphs["mid_avg"].updating = True
|
|
graphs["treble_avg"].updating = True
|
|
graphs["bass_mark"].updating = True
|
|
graphs["mid_mark"].updating = True
|
|
graphs["treble_mark"].updating = True
|
|
|
|
def update(frame):
|
|
if data:
|
|
block = data.pop(0)
|
|
else:
|
|
return []
|
|
if len(data) > 4:
|
|
print(f"buffer overflow: {len(data)}")
|
|
data.clear()
|
|
|
|
values = BlockStruct.unpack(block)
|
|
|
|
i = 0
|
|
floating_max = values[i]
|
|
i += 1
|
|
avg_powers = values[i : i + BANDS_COUNT]
|
|
i += BANDS_COUNT
|
|
cur_powers = values[i : i + BANDS_COUNT]
|
|
i += BANDS_COUNT
|
|
|
|
graphs["fft"].update(values[i : i + BLOCK_LENGTH])
|
|
|
|
# print(
|
|
# f"""
|
|
# floating_max:{floating_max}
|
|
# avg_pow:{avg_powers}
|
|
# cur_pow:{cur_powers}
|
|
# min:{np.min(graphs["fft"].y)}
|
|
# max:{np.max(graphs["fft"].y)}
|
|
# average:{np.average(graphs["fft"].y)}
|
|
# median:{np.median(graphs["fft"].y)}"""
|
|
# )
|
|
|
|
graphs["floating_max"].update(np.log10((60 * floating_max / (2**31))))
|
|
|
|
graphs["bass"].update(cur_powers[0])
|
|
graphs["bass_avg"].update(avg_powers[0])
|
|
graphs["bass_mark"].update(
|
|
graphs["bass_mark"].ax.get_ylim()[1] * 0.8
|
|
if cur_powers[0] > 1.3 * avg_powers[0]
|
|
else avg_powers[0]
|
|
)
|
|
|
|
graphs["mid"].update(cur_powers[1])
|
|
graphs["mid_avg"].update(avg_powers[1])
|
|
graphs["mid_mark"].update(
|
|
graphs["mid_mark"].ax.get_ylim()[1] * 0.8
|
|
if cur_powers[1] > 1.3 * avg_powers[1]
|
|
else avg_powers[1]
|
|
)
|
|
|
|
graphs["treble"].update(cur_powers[2])
|
|
graphs["treble_avg"].update(avg_powers[2])
|
|
graphs["treble_mark"].update(
|
|
graphs["treble_mark"].ax.get_ylim()[1] * 0.8
|
|
if cur_powers[2] > 1.3 * avg_powers[2]
|
|
else avg_powers[2]
|
|
)
|
|
|
|
return [g.line for g in graphs.values() if g.updating]
|
|
|
|
anim = FuncAnimation(
|
|
fig,
|
|
update,
|
|
blit=True,
|
|
frames=None,
|
|
cache_frame_data=False,
|
|
interval=32 / 24000,
|
|
)
|
|
plt.show()
|
|
|
|
|
|
def on_close(_):
|
|
global do_recv
|
|
do_recv = False
|
|
if recv_thread:
|
|
recv_thread.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|