import time from typing import List import aubio import pyaudiowpatch as pa import numpy as np import threading def find_all_inputs() -> List[dict]: devices: List[dict] = [] p = pa.PyAudio() api_info = p.get_host_api_info_by_type(pa.paWASAPI) for i in range(api_info["deviceCount"]): dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i) if dev_info["maxInputChannels"] > 0: devices.append(dev_info) return devices def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict: api_info = p.get_host_api_info_by_type(api) for i in range(api_info["deviceCount"]): dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i) if name in dev_info["name"] and dev_info["maxInputChannels"] > 0: return dev_info return {} class AudioProcess: def __init__(self, device_name: str): self.pa = pa.PyAudio() self.win_s = 1024 self.hop_s = self.win_s // 2 i_dev = find_audio_device(self.pa, pa.paWASAPI, device_name) self.rate = int(i_dev["defaultSampleRate"]) self.a_source = self.pa.open( rate=self.rate, channels=2, format=pa.paFloat32, input=True, input_device_index=i_dev["index"], frames_per_buffer=self.hop_s, stream_callback=self.pyaudio_callback, ) self.lock = threading.Lock() self.is_beat = False self.tempo = aubio.tempo("default", self.win_s, self.hop_s) self.thread = threading.Thread(name="AudioProcess", target=self.process) self.thread.start() def process(self): self.a_source.start_stream() while self.a_source.is_active() and threading.main_thread().is_alive(): time.sleep(0.1) self.a_source.stop_stream() self.a_source.close() self.pa.terminate() def stop(self): if self.a_source.is_active: self.a_source.stop_stream() if self.thread.is_alive: self.thread.join() def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status): samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count) read = _frame_count is_beat = self.tempo(samples) if is_beat: with self.lock: self.is_beat = True audiobuf = samples.tobytes() if read < self.hop_s: return (audiobuf, pa.paComplete) return (audiobuf, pa.paContinue)