pybeamshow/util/audio.py
2023-03-05 01:46:59 +01:00

84 lines
2.5 KiB
Python

import time
from typing import List
import aubio
import pyaudiowpatch as pa
import numpy as np
import threading
def find_all_inputs() -> List[dict]:
devices: List[dict] = []
p = pa.PyAudio()
api_info = p.get_host_api_info_by_type(pa.paWASAPI)
for i in range(api_info["deviceCount"]):
dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i)
if dev_info["maxInputChannels"] > 0:
devices.append(dev_info)
return devices
def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict:
api_info = p.get_host_api_info_by_type(api)
for i in range(api_info["deviceCount"]):
dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i)
if name in dev_info["name"] and dev_info["maxInputChannels"] > 0:
return dev_info
return {}
class AudioProcess:
def __init__(self, device_name: str):
self.pa = pa.PyAudio()
self.win_s = 1024
self.hop_s = self.win_s // 2
i_dev = find_audio_device(self.pa, pa.paWASAPI, device_name)
self.rate = int(i_dev["defaultSampleRate"])
self.a_source = self.pa.open(
rate=self.rate,
channels=2,
format=pa.paFloat32,
input=True,
input_device_index=i_dev["index"],
frames_per_buffer=self.hop_s,
stream_callback=self.pyaudio_callback,
)
self.lock = threading.Lock()
self.is_beat = False
self.tempo = aubio.tempo("default", self.win_s, self.hop_s)
self.thread = threading.Thread(name="AudioProcess", target=self.process)
self.thread.start()
def process(self):
self.a_source.start_stream()
while self.a_source.is_active() and threading.main_thread().is_alive():
time.sleep(0.1)
self.a_source.stop_stream()
self.a_source.close()
self.pa.terminate()
def stop(self):
if self.a_source.is_active:
self.a_source.stop_stream()
if self.thread.is_alive:
self.thread.join()
def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status):
samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count)
read = _frame_count
is_beat = self.tempo(samples)
if is_beat:
with self.lock:
self.is_beat = True
audiobuf = samples.tobytes()
if read < self.hop_s:
return (audiobuf, pa.paComplete)
return (audiobuf, pa.paContinue)