pybeamshow/util/audio.py

73 lines
2.1 KiB
Python
Raw Normal View History

2023-02-22 03:43:45 +01:00
import time
import aubio
import pyaudiowpatch as pa
import numpy as np
import threading
def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict:
api_info = p.get_host_api_info_by_type(api)
for i in range(api_info["deviceCount"]):
dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i)
if name in dev_info["name"] and dev_info["maxInputChannels"] > 0:
return dev_info
return {}
class AudioProcess:
2023-02-23 02:02:24 +01:00
def __init__(self, device_name: str):
2023-02-22 03:43:45 +01:00
self.pa = pa.PyAudio()
self.win_s = 1024
self.hop_s = self.win_s // 2
2023-02-23 02:02:24 +01:00
i_dev = find_audio_device(self.pa, pa.paWASAPI, device_name)
2023-02-22 03:43:45 +01:00
self.rate = int(i_dev["defaultSampleRate"])
self.a_source = self.pa.open(
rate=self.rate,
channels=2,
format=pa.paFloat32,
input=True,
input_device_index=i_dev["index"],
frames_per_buffer=self.hop_s,
stream_callback=self.pyaudio_callback,
)
self.lock = threading.Lock()
self.is_beat = False
self.tempo = aubio.tempo("default", self.win_s, self.hop_s)
self.thread = threading.Thread(name="AudioProcess", target=self.process)
self.thread.start()
def process(self):
self.a_source.start_stream()
while self.a_source.is_active():
time.sleep(0.1)
self.a_source.stop_stream()
self.a_source.close()
self.pa.terminate()
def stop(self):
if self.a_source.is_active:
self.a_source.stop_stream()
if self.thread.is_alive:
self.thread.join()
def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status):
samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count)
read = _frame_count
is_beat = self.tempo(samples)
if is_beat:
with self.lock:
self.is_beat = True
audiobuf = samples.tobytes()
if read < self.hop_s:
return (audiobuf, pa.paComplete)
return (audiobuf, pa.paContinue)