import time import aubio import pyaudiowpatch as pa import numpy as np import threading def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict: api_info = p.get_host_api_info_by_type(api) for i in range(api_info["deviceCount"]): dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i) if name in dev_info["name"] and dev_info["maxInputChannels"] > 0: return dev_info return {} class AudioProcess: def __init__(self): self.pa = pa.PyAudio() self.win_s = 1024 self.hop_s = self.win_s // 2 i_dev = find_audio_device(self.pa, pa.paWASAPI, "Notepad") self.rate = int(i_dev["defaultSampleRate"]) self.a_source = self.pa.open( rate=self.rate, channels=2, format=pa.paFloat32, input=True, input_device_index=i_dev["index"], frames_per_buffer=self.hop_s, stream_callback=self.pyaudio_callback, ) self.lock = threading.Lock() self.is_beat = False self.tempo = aubio.tempo("default", self.win_s, self.hop_s) self.thread = threading.Thread(name="AudioProcess", target=self.process) self.thread.start() def process(self): self.a_source.start_stream() while self.a_source.is_active(): time.sleep(0.1) self.a_source.stop_stream() self.a_source.close() self.pa.terminate() def stop(self): if self.a_source.is_active: self.a_source.stop_stream() if self.thread.is_alive: self.thread.join() def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status): # samples, read = _in_data, _frame_count # audio_data = np.fromstring(in_data, dtype=np.float32) samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count) read = _frame_count is_beat = self.tempo(samples) if is_beat: with self.lock: # print(self.tempo.get) self.is_beat = True # samples += click # print("tick") # avoid print in audio callback audiobuf = samples.tobytes() if read < self.hop_s: return (audiobuf, pa.paComplete) return (audiobuf, pa.paContinue) # self.stop = False # self.thread = threading.Thread(name="AudioProcess", target=self.process) # self.thread.start() # def process(self): # while not self.stop: # samples = self.dev.record(self.hop_s, self.rate, 2) # mono = samples.sum(axis=1) # is_beat = self.tempo(samples) # if is_beat: