ledstick/python/plotserial_fft.py
Patrick Moessler fc4005cc49 v2
2025-02-24 00:29:15 +01:00

114 lines
2.4 KiB
Python

# /// script
# requires-python = ">=3.13"
# dependencies = [
# "matplotlib",
# "numpy",
# "pyserial",
# ]
# ///
from struct import unpack
from threading import Thread
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from serial import Serial
SYNC = bytes.fromhex("001180007fff1100")
BLOCK_LENGTH = 256
BLOCK_SIZE = 2 * 2 * BLOCK_LENGTH
data: list[bytes] = []
do_recv = True
recv_thread = None
def recv_main():
ser = Serial("COM5", baudrate=1000000)
def sync():
syncbuf = bytearray(ser.read(len(SYNC)))
if syncbuf[: len(SYNC)] == SYNC:
return
print("not in sync...")
synced = False
while not synced:
if syncbuf[: len(SYNC)] == SYNC:
synced = True
break
syncbuf.append(ser.read(1)[0])
syncbuf.pop(0)
if not synced:
raise ConnectionError
print("synced")
while do_recv:
sync()
data.append(ser.read(BLOCK_SIZE))
def main() -> None:
global recv_thread
x1 = np.arange(BLOCK_LENGTH)
y1 = np.zeros(BLOCK_LENGTH)
x2 = np.arange(BLOCK_LENGTH)
y2 = np.zeros(BLOCK_LENGTH)
fig, ax = plt.subplots()
fig.canvas.mpl_connect("close_event", on_close)
graph1 = ax.plot(x1, y1)[0]
graph2 = ax.plot(x2, y2)[0]
plt.ylim(-32767, 32768)
np.set_printoptions(suppress=True, precision=2)
recv_thread = Thread(target=recv_main)
recv_thread.start()
def update(frame):
if data:
block = data.pop(0)
else:
return
if len(data) > 4:
print(f"buffer overflow: {len(data)}")
data.clear()
values = unpack(f"{2*BLOCK_LENGTH}h", block)
y1[:] = values[:BLOCK_LENGTH]
y1[-1] = values[BLOCK_LENGTH - 1]
y2[:] = abs(y1)
graph1.set_ydata(y1)
# graph2.set_ydata(y2)
anim = FuncAnimation(
fig, update, frames=None, cache_frame_data=False, interval=64 / 24000
)
plt.show()
# while True:
# block = ser.read(2 * BLOCK_LENGTH)
# values = unpack(f"{BLOCK_LENGTH}h", block)
# y[:] = values[:]
# graph.set_ydata(y)
# print(values)
# n = int.from_bytes(ser.read(2), "little", signed=True)
# print(n)
def on_close(_):
global do_recv
do_recv = False
if recv_thread:
recv_thread.join()
if __name__ == "__main__":
main()