From 950e76e4089add07b0518dff48786840521a6271 Mon Sep 17 00:00:00 2001 From: Patrick Moessler Date: Wed, 22 Feb 2023 03:43:45 +0100 Subject: [PATCH] audio beat detection temp --- .vscode/launch.json | 2 +- audio.py | 88 +++++++++++++++++++++++++++++++ beamshow.py | 126 ++++++++++++++++++++++++++------------------ requirements.txt | 3 +- 4 files changed, 166 insertions(+), 53 deletions(-) create mode 100644 audio.py diff --git a/.vscode/launch.json b/.vscode/launch.json index 1503c3c..09faf0e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,7 +11,7 @@ "program": "beamshow.py", "console": "integratedTerminal", "justMyCode": true, - "args": ["-w"] + "args": ["-w","-b"] } ] } \ No newline at end of file diff --git a/audio.py b/audio.py new file mode 100644 index 0000000..0b2cdd9 --- /dev/null +++ b/audio.py @@ -0,0 +1,88 @@ +import time +import aubio +import pyaudiowpatch as pa + +import numpy as np + +import threading + + +def find_audio_device(p: pa.PyAudio, api: int, name: str) -> dict: + api_info = p.get_host_api_info_by_type(api) + for i in range(api_info["deviceCount"]): + dev_info = p.get_device_info_by_host_api_device_index(api_info["index"], i) + if name in dev_info["name"] and dev_info["maxInputChannels"] > 0: + return dev_info + return {} + + +class AudioProcess: + def __init__(self): + self.pa = pa.PyAudio() + + self.win_s = 1024 + self.hop_s = self.win_s // 2 + + i_dev = find_audio_device(self.pa, pa.paWASAPI, "Notepad") + self.rate = int(i_dev["defaultSampleRate"]) + self.a_source = self.pa.open( + rate=self.rate, + channels=2, + format=pa.paFloat32, + input=True, + input_device_index=i_dev["index"], + frames_per_buffer=self.hop_s, + stream_callback=self.pyaudio_callback, + ) + + self.lock = threading.Lock() + self.is_beat = False + + self.tempo = aubio.tempo("default", self.win_s, self.hop_s) + self.thread = threading.Thread(name="AudioProcess", target=self.process) + self.thread.start() + + def process(self): + self.a_source.start_stream() + + while self.a_source.is_active(): + time.sleep(0.1) + + self.a_source.stop_stream() + self.a_source.close() + self.pa.terminate() + + def stop(self): + if self.a_source.is_active: + self.a_source.stop_stream() + if self.thread.is_alive: + self.thread.join() + + def pyaudio_callback(self, _in_data, _frame_count, _time_info, _status): + # samples, read = _in_data, _frame_count + # audio_data = np.fromstring(in_data, dtype=np.float32) + samples = np.fromstring(_in_data, dtype=np.float32, count=_frame_count) + read = _frame_count + is_beat = self.tempo(samples) + if is_beat: + with self.lock: + self.is_beat = True + # samples += click + # print("tick") # avoid print in audio callback + + audiobuf = samples.tobytes() + if read < self.hop_s: + return (audiobuf, pa.paComplete) + return (audiobuf, pa.paContinue) + + # self.stop = False + # self.thread = threading.Thread(name="AudioProcess", target=self.process) + # self.thread.start() + + # def process(self): + # while not self.stop: + # samples = self.dev.record(self.hop_s, self.rate, 2) + # mono = samples.sum(axis=1) + # is_beat = self.tempo(samples) + + # if is_beat: diff --git a/beamshow.py b/beamshow.py index 54da3db..8bc2799 100644 --- a/beamshow.py +++ b/beamshow.py @@ -7,6 +7,7 @@ import sys from effects.effect import Effect, Colors, color_darken, color_fadeout, color_wheel from effects.presets import Presets +from audio import AudioProcess def print_displays() -> None: @@ -38,11 +39,15 @@ class Beamshow: self.effects: List[Effect] = [] self.clock = pg.time.Clock() self.beat_reactive = beat_reactive + self.beat_factor = 1 + self.beat_skip_counter = 0 pg.display.init() self.window, self.background = self.initialize() + self.audio = AudioProcess() def initialize(self) -> Tuple[pg.Surface, pg.Surface]: + recreate_effect = not self.effects or pg.display.is_fullscreen == self.windowed displays = pg.display.get_desktop_sizes() if not 0 <= self.display_id < len(displays): raise ValueError( @@ -63,20 +68,14 @@ class Beamshow: background = pg.Surface(win.get_size()) background.fill(Colors.Black) - self.presets = Presets(bounds=win.get_rect(), beat_reactive=self.beat_reactive) - if self.randomize: - self.effects = self.presets.randomize() - else: - # self.effects = self.presets.default() - from effects.spiro import Spiro - - self.effects = [ - Spiro( - bounds=win.get_rect(), - color=color_wheel(increase=10), - sizes=(win.get_rect().height * 0.8, win.get_rect().height * 0.8), - ) - ] + if recreate_effect: + self.presets = Presets( + bounds=win.get_rect(), beat_reactive=self.beat_reactive + ) + if self.randomize: + self.effects = self.presets.randomize() + else: + self.effects = self.presets.default() return win, background @@ -147,22 +146,34 @@ class Beamshow: framecounter = 0 blackout = False single_random = False - taps = [] - tap_mean = 0 - last_beat = 0 + # taps = [] + # tap_mean = 0 + # last_beat = 0 fps_slidewindow = [] frames_per_beat = 0 while True: - is_beat = False + with self.audio.lock: + is_beat = self.audio.is_beat + self.audio.is_beat = False + if is_beat: + if self.beat_skip_counter > 0: + is_beat = False + print(' skip') + self.beat_skip_counter -= 1 + else: + print('beat') + self.beat_skip_counter = self.beat_factor - 1 + + # is_beat = False fps_mean = ( sum(fps_slidewindow) / len(fps_slidewindow) if fps_slidewindow else 0 ) - if tap_mean and last_beat: - this_beat = time.time() - is_beat = this_beat >= last_beat + tap_mean - frames_per_beat = fps_mean * tap_mean - if is_beat: - last_beat = this_beat + # if tap_mean and last_beat: + # this_beat = time.time() + # is_beat = this_beat >= last_beat + tap_mean + # frames_per_beat = fps_mean * tap_mean + # if is_beat: + # last_beat = this_beat common_events = loop.send((is_beat, frames_per_beat)) reinitialize = False @@ -170,12 +181,17 @@ class Beamshow: if event.type == pg.QUIT or ( event.type == pg.KEYDOWN and event.key == pg.K_ESCAPE ): + self.audio.stop() pg.quit() sys.exit() elif event.type == pg.KEYDOWN: if event.key == pg.K_F5: single_random = True print("Switching to new random preset") + elif event.key == pg.K_F6: + print("Reload") + self.effects.clear() + reinitialize = True elif event.key == pg.K_F8: self.randomize = not self.randomize state_str = ( @@ -201,35 +217,42 @@ class Beamshow: print(f'BLACKOUT [{"ON" if blackout else "off"}]') elif event.key == pg.K_HOME: print("resetting beat timing") - this_tap = time.time() - last_beat = this_tap - elif event.key == pg.K_END: - print("starting new tap measurement") - taps.clear() + self.beat_skip_counter = 0 + # this_tap = time.time() + # last_beat = this_tap + # elif event.key == pg.K_END: + # print("starting new tap measurement") + # taps.clear() elif event.key == pg.K_PAGEUP: - tap_mean /= 2 - print( - f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [x2]" + self.beat_factor = ( + self.beat_factor / 2 if self.beat_factor > 1 else 1 ) + print(f"Trigger on every {self.beat_factor} beat") + # tap_mean /= 2 + # print( + # f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [x2]" + # ) elif event.key == pg.K_PAGEDOWN: - tap_mean *= 2 - print( - f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [/2]" - ) - elif event.key == pg.K_KP_ENTER or event.key == pg.K_RETURN: - this_tap = time.time() - if taps and this_tap - taps[-1] > 10: - print("starting new tap measurement") - taps.clear() - taps.append(this_tap) - last_beat = this_tap - if len(taps) >= 2: - tap_mean = sum( - map(lambda t1, t2: t2 - t1, taps, taps[1:]) - ) / (len(taps) - 1) - print( - f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB" - ) + self.beat_factor = self.beat_factor * 2 + print(f"Trigger on every {self.beat_factor} beat") + # tap_mean *= 2 + # print( + # f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB [/2]" + # ) + # elif event.key == pg.K_KP_ENTER or event.key == pg.K_RETURN: + # this_tap = time.time() + # if taps and this_tap - taps[-1] > 10: + # print("starting new tap measurement") + # taps.clear() + # taps.append(this_tap) + # last_beat = this_tap + # if len(taps) >= 2: + # tap_mean = sum( + # map(lambda t1, t2: t2 - t1, taps, taps[1:]) + # ) / (len(taps) - 1) + # print( + # f"Taps: Mean {tap_mean:5.3f} s, {60/tap_mean:5.3f} BPM, {fps_mean * tap_mean:5.3f} FPB" + # ) if reinitialize: self.window, self.background = self.initialize() @@ -256,8 +279,9 @@ class Beamshow: new_preset = self.presets.randomize() while new_preset == self.effects: new_preset = self.presets.randomize() + print(new_preset) self.effects.clear() - self.effects.extend(self.presets.randomize()) + self.effects.extend(new_preset) current_fps = self.clock.get_fps() fps_slidewindow.append(current_fps) diff --git a/requirements.txt b/requirements.txt index 0e64225..ba810ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ pygame==2.1.3 -soundcard==0.4.2 \ No newline at end of file +soundcard==0.4.2 +pyaudiowpatch==0.2.12.5 \ No newline at end of file