ledstick/python/plotserial_fft_float.py
Patrick Moessler f5e663b724 graph updates
2025-03-09 03:12:28 +01:00

227 lines
6 KiB
Python

# /// script
# requires-python = ">=3.13"
# dependencies = [
# "matplotlib",
# "numpy",
# "pyserial",
# ]
# ///
from struct import Struct
from threading import Thread
from typing import Iterable
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from serial import Serial
SYNC = bytes.fromhex("001180007fff1100")
BLOCK_LENGTH = 512
BANDS_COUNT = 3
BlockStruct = Struct(f"I{BANDS_COUNT}f{BANDS_COUNT}f{BLOCK_LENGTH}f")
data: list[bytes] = []
do_recv = True
recv_thread = None
def recv_main():
ser = Serial("COM5", baudrate=1000000)
def sync():
syncbuf = bytearray(ser.read(len(SYNC)))
if syncbuf[: len(SYNC)] == SYNC:
return
print("not in sync...")
synced = False
while not synced:
if syncbuf[: len(SYNC)] == SYNC:
synced = True
break
syncbuf.append(ser.read(1)[0])
syncbuf.pop(0)
if not synced:
raise ConnectionError
print("synced")
while do_recv:
sync()
data.append(ser.read(BlockStruct.size))
class Graph:
def __init__(
self,
ax,
length: int,
limits: tuple[float, float],
offset: float = 0,
scale: float = 1,
) -> None:
self.x = np.arange(length)
self.y = np.zeros(length)
self.offset = offset
self.scale = scale
self.ax = ax
self.line = self.ax.plot(self.x, self.y)[0]
self.ax.set_ylim(*limits)
self.updating = True
def update(self, new_data: Iterable[float], prescaled: bool = False) -> None:
self.y[:] = new_data
if not prescaled:
self.y *= self.scale
self.y += self.offset
if self.updating:
self.line.set_ydata(self.y)
class RollingGraph(Graph):
def update(self, new_data: float, prescaled: bool = False):
data = np.roll(self.y, -1)
data[-1] = new_data
if not prescaled:
data[-1] *= self.scale
data[-1] += self.offset
return super().update(data, prescaled=True)
def main() -> None:
global recv_thread
fig, ax = plt.subplots(5)
fig.canvas.mpl_connect("close_event", on_close)
offset = 0
scale = (0, 1)
bass_scale = (0, 1)
mid_scale = (0, 0.5)
treble_scale = (0, 0.3)
graphs: dict[Graph] = {
"fft": Graph(ax[0], BLOCK_LENGTH, scale, offset),
"floating_max": RollingGraph(ax[1], 200, (-2, 2)),
"bass": RollingGraph(ax[2], 200, bass_scale, offset),
"mid": RollingGraph(ax[3], 200, mid_scale, offset),
"treble": RollingGraph(ax[4], 200, treble_scale, offset),
"bass_avg": RollingGraph(ax[2], 200, bass_scale, offset),
"mid_avg": RollingGraph(ax[3], 200, mid_scale, offset),
"treble_avg": RollingGraph(ax[4], 200, treble_scale, offset),
"bass_mark": RollingGraph(ax[2], 200, bass_scale, offset),
"mid_mark": RollingGraph(ax[3], 200, mid_scale, offset),
"treble_mark": RollingGraph(ax[4], 200, treble_scale, offset),
}
# graph1 = ax[0].plot(x1, y1, label="fft")[0]
# graph2 = ax[1].plot(x2, y2, label="floating max")[0]
# graph3 = ax[2].bar(x3, y3, label="avg")
# graph4 = ax[2].bar(x4, y4, label="cur")
# ax[0].set_ylim(0, 100)
# ax[1].set_ylim(-2, 2)
# ax[2].set_ylim(0, 100)
np.set_printoptions(suppress=True, precision=2)
recv_thread = Thread(target=recv_main)
recv_thread.start()
graphs["fft"].updating = True
graphs["floating_max"].updating = True
graphs["bass"].updating = True
graphs["mid"].updating = True
graphs["treble"].updating = True
graphs["bass_avg"].updating = True
graphs["mid_avg"].updating = True
graphs["treble_avg"].updating = True
graphs["bass_mark"].updating = True
graphs["mid_mark"].updating = True
graphs["treble_mark"].updating = True
def update(frame):
if data:
block = data.pop(0)
else:
return []
if len(data) > 4:
print(f"buffer overflow: {len(data)}")
data.clear()
values = BlockStruct.unpack(block)
i = 0
floating_max = values[i]
i += 1
avg_powers = values[i : i + BANDS_COUNT]
i += BANDS_COUNT
cur_powers = values[i : i + BANDS_COUNT]
i += BANDS_COUNT
graphs["fft"].update(values[i : i + BLOCK_LENGTH])
# print(
# f"""
# floating_max:{floating_max}
# avg_pow:{avg_powers}
# cur_pow:{cur_powers}
# min:{np.min(graphs["fft"].y)}
# max:{np.max(graphs["fft"].y)}
# average:{np.average(graphs["fft"].y)}
# median:{np.median(graphs["fft"].y)}"""
# )
graphs["floating_max"].update(np.log10((60 * floating_max / (2**31))))
graphs["bass"].update(cur_powers[0])
graphs["bass_avg"].update(avg_powers[0])
graphs["bass_mark"].update(
graphs["bass_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[0] > 1.3 * avg_powers[0]
else avg_powers[0]
)
graphs["mid"].update(cur_powers[1])
graphs["mid_avg"].update(avg_powers[1])
graphs["mid_mark"].update(
graphs["mid_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[1] > 1.3 * avg_powers[1]
else avg_powers[1]
)
graphs["treble"].update(cur_powers[2])
graphs["treble_avg"].update(avg_powers[2])
graphs["treble_mark"].update(
graphs["treble_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[2] > 1.3 * avg_powers[2]
else avg_powers[2]
)
return [g.line for g in graphs.values() if g.updating]
anim = FuncAnimation(
fig,
update,
blit=True,
frames=None,
cache_frame_data=False,
interval=32 / 24000,
)
plt.show()
def on_close(_):
global do_recv
do_recv = False
if recv_thread:
recv_thread.join()
if __name__ == "__main__":
main()