audio beat detection temp

This commit is contained in:
Patrick Moessler 2023-02-22 03:43:45 +01:00
parent 2aac7f7fac
commit 950e76e408
4 changed files with 166 additions and 53 deletions

2
.vscode/launch.json vendored
View file

@ -11,7 +11,7 @@
"program": "beamshow.py",
"console": "integratedTerminal",
"justMyCode": true,
"args": ["-w"]
"args": ["-w","-b"]
}
]
}

88
audio.py Normal file
View file

@ -0,0 +1,88 @@
import time
import aubio
import pyaudiowpatch as pa
import numpy as np
import threading
def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict:
api_info = p.get_host_api_info_by_type(api)
for i in range(api_info["deviceCount"]):
dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i)
if name in dev_info["name"] and dev_info["maxInputChannels"] > 0:
return dev_info
return {}
class AudioProcess:
def __init__(self):
self.pa = pa.PyAudio()
self.win_s = 1024
self.hop_s = self.win_s // 2
i_dev = find_audio_device(self.pa, pa.paWASAPI, "Notepad")
self.rate = int(i_dev["defaultSampleRate"])
self.a_source = self.pa.open(
rate=self.rate,
channels=2,
format=pa.paFloat32,
input=True,
input_device_index=i_dev["index"],
frames_per_buffer=self.hop_s,
stream_callback=self.pyaudio_callback,
)
self.lock = threading.Lock()
self.is_beat = False
self.tempo = aubio.tempo("default", self.win_s, self.hop_s)
self.thread = threading.Thread(name="AudioProcess", target=self.process)
self.thread.start()
def process(self):
self.a_source.start_stream()
while self.a_source.is_active():
time.sleep(0.1)
self.a_source.stop_stream()
self.a_source.close()
self.pa.terminate()
def stop(self):
if self.a_source.is_active:
self.a_source.stop_stream()
if self.thread.is_alive:
self.thread.join()
def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status):
# samples, read = _in_data, _frame_count
# audio_data = np.fromstring(in_data, dtype=np.float32)
samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count)
read = _frame_count
is_beat = self.tempo(samples)
if is_beat:
with self.lock:
self.is_beat = True
# samples += click
# print("tick") # avoid print in audio callback
audiobuf = samples.tobytes()
if read < self.hop_s:
return (audiobuf, pa.paComplete)
return (audiobuf, pa.paContinue)
# self.stop = False
# self.thread = threading.Thread(name="AudioProcess", target=self.process)
# self.thread.start()
# def process(self):
# while not self.stop:
# samples = self.dev.record(self.hop_s, self.rate, 2)
# mono = samples.sum(axis=1)
# is_beat = self.tempo(samples)
# if is_beat:

View file

@ -7,6 +7,7 @@ import sys
from effects.effect import Effect, Colors, color_darken, color_fadeout, color_wheel
from effects.presets import Presets
from audio import AudioProcess
def print_displays() -> None:
@ -38,11 +39,15 @@ class Beamshow:
self.effects: List[Effect] = []
self.clock = pg.time.Clock()
self.beat_reactive = beat_reactive
self.beat_factor = 1
self.beat_skip_counter = 0
pg.display.init()
self.window, self.background = self.initialize()
self.audio = AudioProcess()
def initialize(self) -> Tuple[pg.Surface, pg.Surface]:
recreate_effect = not self.effects or pg.display.is_fullscreen == self.windowed
displays = pg.display.get_desktop_sizes()
if not 0 <= self.display_id < len(displays):
raise ValueError(
@ -63,20 +68,14 @@ class Beamshow:
background = pg.Surface(win.get_size())
background.fill(Colors.Black)
self.presets = Presets(bounds=win.get_rect(), beat_reactive=self.beat_reactive)
if recreate_effect:
self.presets = Presets(
bounds=win.get_rect(), beat_reactive=self.beat_reactive
)
if self.randomize:
self.effects = self.presets.randomize()
else:
# self.effects = self.presets.default()
from effects.spiro import Spiro
self.effects = [
Spiro(
bounds=win.get_rect(),
color=color_wheel(increase=10),
sizes=(win.get_rect().height * 0.8, win.get_rect().height * 0.8),
)
]
self.effects = self.presets.default()
return win, background
@ -147,22 +146,34 @@ class Beamshow:
framecounter = 0
blackout = False
single_random = False
taps = []
tap_mean = 0
last_beat = 0
# taps = []
# tap_mean = 0
# last_beat = 0
fps_slidewindow = []
frames_per_beat = 0
while True:
with self.audio.lock:
is_beat = self.audio.is_beat
self.audio.is_beat = False
if is_beat:
if self.beat_skip_counter > 0:
is_beat = False
print(' skip')
self.beat_skip_counter -= 1
else:
print('beat')
self.beat_skip_counter = self.beat_factor - 1
# is_beat = False
fps_mean = (
sum(fps_slidewindow) / len(fps_slidewindow) if fps_slidewindow else 0
)
if tap_mean and last_beat:
this_beat = time.time()
is_beat = this_beat >= last_beat + tap_mean
frames_per_beat = fps_mean * tap_mean
if is_beat:
last_beat = this_beat
# if tap_mean and last_beat:
# this_beat = time.time()
# is_beat = this_beat >= last_beat + tap_mean
# frames_per_beat = fps_mean * tap_mean
# if is_beat:
# last_beat = this_beat
common_events = loop.send((is_beat, frames_per_beat))
reinitialize = False
@ -170,12 +181,17 @@ class Beamshow:
if event.type == pg.QUIT or (
event.type == pg.KEYDOWN and event.key == pg.K_ESCAPE
):
self.audio.stop()
pg.quit()
sys.exit()
elif event.type == pg.KEYDOWN:
if event.key == pg.K_F5:
single_random = True
print("Switching to new random preset")
elif event.key == pg.K_F6:
print("Reload")
self.effects.clear()
reinitialize = True
elif event.key == pg.K_F8:
self.randomize = not self.randomize
state_str = (
@ -201,35 +217,42 @@ class Beamshow:
print(f'BLACKOUT [{"ON" if blackout else "off"}]')
elif event.key == pg.K_HOME:
print("resetting beat timing")
this_tap = time.time()
last_beat = this_tap
elif event.key == pg.K_END:
print("starting new tap measurement")
taps.clear()
self.beat_skip_counter = 0
# this_tap = time.time()
# last_beat = this_tap
# elif event.key == pg.K_END:
# print("starting new tap measurement")
# taps.clear()
elif event.key == pg.K_PAGEUP:
tap_mean /= 2
print(
f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [x2]"
self.beat_factor = (
self.beat_factor / 2 if self.beat_factor > 1 else 1
)
print(f"Trigger on every {self.beat_factor} beat")
# tap_mean /= 2
# print(
# f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [x2]"
# )
elif event.key == pg.K_PAGEDOWN:
tap_mean *= 2
print(
f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [/2]"
)
elif event.key == pg.K_KP_ENTER or event.key == pg.K_RETURN:
this_tap = time.time()
if taps and this_tap - taps[-1] > 10:
print("starting new tap measurement")
taps.clear()
taps.append(this_tap)
last_beat = this_tap
if len(taps) >= 2:
tap_mean = sum(
map(lambda t1, t2: t2 - t1, taps, taps[1:])
) / (len(taps) - 1)
print(
f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB"
)
self.beat_factor = self.beat_factor * 2
print(f"Trigger on every {self.beat_factor} beat")
# tap_mean *= 2
# print(
# f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [/2]"
# )
# elif event.key == pg.K_KP_ENTER or event.key == pg.K_RETURN:
# this_tap = time.time()
# if taps and this_tap - taps[-1] > 10:
# print("starting new tap measurement")
# taps.clear()
# taps.append(this_tap)
# last_beat = this_tap
# if len(taps) >= 2:
# tap_mean = sum(
# map(lambda t1, t2: t2 - t1, taps, taps[1:])
# ) / (len(taps) - 1)
# print(
# f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB"
# )
if reinitialize:
self.window, self.background = self.initialize()
@ -256,8 +279,9 @@ class Beamshow:
new_preset = self.presets.randomize()
while new_preset == self.effects:
new_preset = self.presets.randomize()
print(new_preset)
self.effects.clear()
self.effects.extend(self.presets.randomize())
self.effects.extend(new_preset)
current_fps = self.clock.get_fps()
fps_slidewindow.append(current_fps)

View file

@ -1,2 +1,3 @@
pygame==2.1.3
soundcard==0.4.2
pyaudiowpatch==0.2.12.5