graph updates

This commit is contained in:
Patrick Moessler 2025-03-09 03:12:28 +01:00
parent 5a18809a70
commit f5e663b724

View file

@ -9,6 +9,7 @@
from struct import Struct
from threading import Thread
from typing import Iterable
import matplotlib.pyplot as plt
import numpy as np
@ -16,7 +17,7 @@ from matplotlib.animation import FuncAnimation
from serial import Serial
SYNC = bytes.fromhex("001180007fff1100")
BLOCK_LENGTH = 256
BLOCK_LENGTH = 512
BANDS_COUNT = 3
BlockStruct = Struct(f"I{BANDS_COUNT}f{BANDS_COUNT}f{BLOCK_LENGTH}f")
@ -52,29 +53,79 @@ def recv_main():
data.append(ser.read(BlockStruct.size))
x1 = np.arange(BLOCK_LENGTH)
y1 = np.zeros(BLOCK_LENGTH)
x2 = np.arange(BLOCK_LENGTH)
y2 = np.zeros(BLOCK_LENGTH)
x3 = np.arange(BANDS_COUNT)
y3 = np.zeros(BANDS_COUNT)
x4 = np.arange(BANDS_COUNT)
y4 = np.zeros(BANDS_COUNT)
class Graph:
def __init__(
self,
ax,
length: int,
limits: tuple[float, float],
offset: float = 0,
scale: float = 1,
) -> None:
self.x = np.arange(length)
self.y = np.zeros(length)
self.offset = offset
self.scale = scale
self.ax = ax
self.line = self.ax.plot(self.x, self.y)[0]
self.ax.set_ylim(*limits)
self.updating = True
def update(self, new_data: Iterable[float], prescaled: bool = False) -> None:
self.y[:] = new_data
if not prescaled:
self.y *= self.scale
self.y += self.offset
if self.updating:
self.line.set_ydata(self.y)
class RollingGraph(Graph):
def update(self, new_data: float, prescaled: bool = False):
data = np.roll(self.y, -1)
data[-1] = new_data
if not prescaled:
data[-1] *= self.scale
data[-1] += self.offset
return super().update(data, prescaled=True)
def main() -> None:
global recv_thread
fig, ax = plt.subplots(3)
fig, ax = plt.subplots(5)
fig.canvas.mpl_connect("close_event", on_close)
graph1 = ax[0].plot(x1, y1, label="fft")[0]
graph2 = ax[1].plot(x2, y2, label="floating max")[0]
graph3 = ax[2].bar(x3, y3, label="avg")
graph4 = ax[2].bar(x4, y4, label="cur")
ax[0].set_ylim(0, 100)
ax[1].set_ylim(-2, 2)
ax[2].set_ylim(0, 100)
offset = 0
scale = (0, 1)
bass_scale = (0, 1)
mid_scale = (0, 0.5)
treble_scale = (0, 0.3)
graphs: dict[Graph] = {
"fft": Graph(ax[0], BLOCK_LENGTH, scale, offset),
"floating_max": RollingGraph(ax[1], 200, (-2, 2)),
"bass": RollingGraph(ax[2], 200, bass_scale, offset),
"mid": RollingGraph(ax[3], 200, mid_scale, offset),
"treble": RollingGraph(ax[4], 200, treble_scale, offset),
"bass_avg": RollingGraph(ax[2], 200, bass_scale, offset),
"mid_avg": RollingGraph(ax[3], 200, mid_scale, offset),
"treble_avg": RollingGraph(ax[4], 200, treble_scale, offset),
"bass_mark": RollingGraph(ax[2], 200, bass_scale, offset),
"mid_mark": RollingGraph(ax[3], 200, mid_scale, offset),
"treble_mark": RollingGraph(ax[4], 200, treble_scale, offset),
}
# graph1 = ax[0].plot(x1, y1, label="fft")[0]
# graph2 = ax[1].plot(x2, y2, label="floating max")[0]
# graph3 = ax[2].bar(x3, y3, label="avg")
# graph4 = ax[2].bar(x4, y4, label="cur")
# ax[0].set_ylim(0, 100)
# ax[1].set_ylim(-2, 2)
# ax[2].set_ylim(0, 100)
np.set_printoptions(suppress=True, precision=2)
@ -82,18 +133,27 @@ def main() -> None:
recv_thread.start()
def update(frame):
global x1, x2, x3, x4, y1, y2, y3, y4
graphs["fft"].updating = True
graphs["floating_max"].updating = True
graphs["bass"].updating = True
graphs["mid"].updating = True
graphs["treble"].updating = True
graphs["bass_avg"].updating = True
graphs["mid_avg"].updating = True
graphs["treble_avg"].updating = True
graphs["bass_mark"].updating = True
graphs["mid_mark"].updating = True
graphs["treble_mark"].updating = True
def update(frame):
if data:
block = data.pop(0)
else:
return
return []
if len(data) > 4:
print(f"buffer overflow: {len(data)}")
data.clear()
# print(block.hex())
values = BlockStruct.unpack(block)
i = 0
@ -104,44 +164,54 @@ def main() -> None:
cur_powers = values[i : i + BANDS_COUNT]
i += BANDS_COUNT
y1[:] = values[i : i + BLOCK_LENGTH]
graphs["fft"].update(values[i : i + BLOCK_LENGTH])
print(f"floating_max:{floating_max}")
print(f"avg_pow:{avg_powers}")
print(f"cur_pow:{cur_powers}")
print(f"min:{np.min(y1)}")
print(f"max:{np.max(y1)}")
print(f"average:{np.average(y1)}")
print(f"median:{np.median(y1)}")
# print(
# f"""
# floating_max:{floating_max}
# avg_pow:{avg_powers}
# cur_pow:{cur_powers}
# min:{np.min(graphs["fft"].y)}
# max:{np.max(graphs["fft"].y)}
# average:{np.average(graphs["fft"].y)}
# median:{np.median(graphs["fft"].y)}"""
# )
y2[:] = np.roll(y2, -1)
y2[-1] = np.log10((60 * floating_max / (2**31)))
graphs["floating_max"].update(np.log10((60 * floating_max / (2**31))))
y3[:] = avg_powers
y4[:] = cur_powers
graphs["bass"].update(cur_powers[0])
graphs["bass_avg"].update(avg_powers[0])
graphs["bass_mark"].update(
graphs["bass_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[0] > 1.3 * avg_powers[0]
else avg_powers[0]
)
y1 += 100
y3 += 100
y4 += 100
graphs["mid"].update(cur_powers[1])
graphs["mid_avg"].update(avg_powers[1])
graphs["mid_mark"].update(
graphs["mid_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[1] > 1.3 * avg_powers[1]
else avg_powers[1]
)
graph1.set_ydata(y1)
graph2.set_ydata(y2)
for i, b in enumerate(graph3):
b.set_height(y3[i])
for i, b in enumerate(graph4):
b.set_height(y4[i])
# graph3.set_height(y3)
# graph4.set_height(y4)
graphs["treble"].update(cur_powers[2])
graphs["treble_avg"].update(avg_powers[2])
graphs["treble_mark"].update(
graphs["treble_mark"].ax.get_ylim()[1] * 0.8
if cur_powers[2] > 1.3 * avg_powers[2]
else avg_powers[2]
)
return graph1, graph2, graph3, graph4
return [g.line for g in graphs.values() if g.updating]
anim = FuncAnimation(
fig,
update,
# blit=True,
blit=True,
frames=None,
cache_frame_data=False,
interval=64 / 24000,
interval=32 / 24000,
)
plt.show()