114 lines
2.4 KiB
Python
114 lines
2.4 KiB
Python
# /// script
|
|
# requires-python = ">=3.13"
|
|
# dependencies = [
|
|
# "matplotlib",
|
|
# "numpy",
|
|
# "pyserial",
|
|
# ]
|
|
# ///
|
|
|
|
from struct import unpack
|
|
from threading import Thread
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from matplotlib.animation import FuncAnimation
|
|
from serial import Serial
|
|
|
|
SYNC = bytes.fromhex("001180007fff1100")
|
|
BLOCK_LENGTH = 256
|
|
BLOCK_SIZE = 2 * 2 * BLOCK_LENGTH
|
|
|
|
data: list[bytes] = []
|
|
|
|
|
|
do_recv = True
|
|
recv_thread = None
|
|
|
|
|
|
def recv_main():
|
|
ser = Serial("COM5", baudrate=1000000)
|
|
|
|
def sync():
|
|
syncbuf = bytearray(ser.read(len(SYNC)))
|
|
if syncbuf[: len(SYNC)] == SYNC:
|
|
return
|
|
print("not in sync...")
|
|
synced = False
|
|
while not synced:
|
|
if syncbuf[: len(SYNC)] == SYNC:
|
|
synced = True
|
|
break
|
|
syncbuf.append(ser.read(1)[0])
|
|
syncbuf.pop(0)
|
|
if not synced:
|
|
raise ConnectionError
|
|
print("synced")
|
|
|
|
while do_recv:
|
|
sync()
|
|
data.append(ser.read(BLOCK_SIZE))
|
|
|
|
|
|
def main() -> None:
|
|
global recv_thread
|
|
x1 = np.arange(BLOCK_LENGTH)
|
|
y1 = np.zeros(BLOCK_LENGTH)
|
|
x2 = np.arange(BLOCK_LENGTH)
|
|
y2 = np.zeros(BLOCK_LENGTH)
|
|
|
|
fig, ax = plt.subplots()
|
|
fig.canvas.mpl_connect("close_event", on_close)
|
|
graph1 = ax.plot(x1, y1)[0]
|
|
graph2 = ax.plot(x2, y2)[0]
|
|
plt.ylim(-32767, 32768)
|
|
|
|
np.set_printoptions(suppress=True, precision=2)
|
|
|
|
recv_thread = Thread(target=recv_main)
|
|
|
|
recv_thread.start()
|
|
|
|
def update(frame):
|
|
|
|
if data:
|
|
block = data.pop(0)
|
|
else:
|
|
return
|
|
if len(data) > 4:
|
|
print(f"buffer overflow: {len(data)}")
|
|
data.clear()
|
|
|
|
values = unpack(f"{2*BLOCK_LENGTH}h", block)
|
|
y1[:] = values[:BLOCK_LENGTH]
|
|
y1[-1] = values[BLOCK_LENGTH - 1]
|
|
y2[:] = abs(y1)
|
|
|
|
graph1.set_ydata(y1)
|
|
# graph2.set_ydata(y2)
|
|
|
|
anim = FuncAnimation(
|
|
fig, update, frames=None, cache_frame_data=False, interval=64 / 24000
|
|
)
|
|
plt.show()
|
|
|
|
# while True:
|
|
# block = ser.read(2 * BLOCK_LENGTH)
|
|
# values = unpack(f"{BLOCK_LENGTH}h", block)
|
|
# y[:] = values[:]
|
|
# graph.set_ydata(y)
|
|
# print(values)
|
|
|
|
# n = int.from_bytes(ser.read(2), "little", signed=True)
|
|
# print(n)
|
|
|
|
|
|
def on_close(_):
|
|
global do_recv
|
|
do_recv = False
|
|
if recv_thread:
|
|
recv_thread.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|