from argparse import ArgumentParser from typing import Generator, List, Tuple import pygame as pg import sys from effects.effect import Effect from effects.presets import Presets from util.color import Colors from util.audio import AudioProcess, find_all_inputs def print_displays() -> None: pg.display.init() display = pg.display.get_desktop_sizes() for i, d in enumerate(display): print(f"#{i} - {d[0]}x{d[1]}") sys.exit(0) def print_inputs() -> None: devs = find_all_inputs() print("\n".join((x["name"] for x in devs))) sys.exit(0) class Beamshow: def __init__( self, audio_device_name: str, render3d: bool, window: bool, trails: bool, randomize_interval: float, fps: bool, display: int, beat_reactive: bool, ) -> None: self.render3d = render3d self.windowed = window self.trails = trails self.randomize = bool(randomize_interval) self.randomize_interval = randomize_interval or 5 self.print_fps = fps self.display_id = display self.effects: List[Effect] = [] self.clock = pg.time.Clock() self.beat_reactive = beat_reactive self.beat_factor = 1 self.beat_skip_counter = 0 self.last_preset = "" pg.display.init() self.window, self.background = self.initialize() self.audio = AudioProcess(audio_device_name) def initialize(self) -> Tuple[pg.Surface, pg.Surface]: recreate_effect = ( not self.effects or pg.display.is_fullscreen() == self.windowed ) displays = pg.display.get_desktop_sizes() if not 0 <= self.display_id < len(displays): raise ValueError( f"Display ID {self.display_id} invalid. Must be between 0 and {len(displays)}!" ) win = pg.display.set_mode( size=displays[self.display_id] if not self.windowed else (displays[self.display_id][0] // 2, displays[self.display_id][1] // 2), flags=pg.FULLSCREEN if not self.windowed else 0, display=self.display_id, ) if self.trails: background = pg.Surface(win.get_size(), flags=pg.SRCALPHA) background.fill(pg.Color(0, 0, 0, 5)) else: background = pg.Surface(win.get_size()) background.fill(Colors.Black) if recreate_effect: self.presets = Presets( bounds=win.get_rect(), beat_reactive=self.beat_reactive ) if self.last_preset and self.last_preset in self.presets: self.effects = self.presets[self.last_preset] else: if self.randomize: self.last_preset = self.presets.randomize() self.effects = self.presets[self.last_preset] else: self.effects = self.presets.default() return win, background def render_loop_normal( self, ) -> Generator[List[pg.event.Event], Tuple[bool, float], None]: is_beat = False frames_per_beat = 0.0 while True: unhandled_events: List[pg.event.Event] = [] for event in pg.event.get(): unhandled_events.append(event) self.window.blit(self.background, (0, 0)) for e in self.effects: e.update(is_beat=is_beat, frames_per_beat=frames_per_beat) e.draw(self.window) is_beat, frames_per_beat = yield unhandled_events def render_loop_3d( self, ) -> Generator[List[pg.event.Event], Tuple[bool, float], None]: stage = pg.Surface(size=self.window.get_size()) stage.fill(Colors.Black) full_size = stage.get_size() scaled_sizes = [ (((full_size[0] * i) // 100), ((full_size[1] * i) // 100)) for i in range(101) ] scaled_positions = [ ((full_size[0] - s[0]) // 2, (full_size[1] - s[1]) // 2) for s in scaled_sizes ] is_beat = False frames_per_beat = 0.0 while True: unhandled_events: List[pg.event.Event] = [] for event in pg.event.get(): unhandled_events.append(event) stage.blit(self.background, (0, 0)) for e in self.effects: e.update(is_beat=is_beat, frames_per_beat=frames_per_beat) e.draw(stage) self.window.fill(Colors.Black) stage.set_colorkey(Colors.Black) for i in range(0, 101, 4): stage.set_alpha(192 - 192 * ((i / 100) ** 0.5)) self.window.blit( pg.transform.scale(stage, scaled_sizes[int(100 * (i / 100) ** 2)]), scaled_positions[int(100 * (i / 100) ** 2)], ) is_beat, frames_per_beat = yield unhandled_events def main(self): if self.render3d: loop = self.render_loop_3d() else: loop = self.render_loop_normal() next(loop) framecounter = 0 blackout = False single_random = False strobe = False fps_slidewindow = [] frames_per_beat = 0 pause_beat = False while True: with self.audio.lock: is_beat = self.audio.is_beat self.audio.is_beat = False if is_beat: if self.beat_skip_counter > 0: is_beat = False # print(" skip") self.beat_skip_counter -= 1 else: # print("beat") self.beat_skip_counter = self.beat_factor - 1 fps_mean = ( sum(fps_slidewindow) / len(fps_slidewindow) if fps_slidewindow else 0 ) common_events = loop.send((is_beat and not pause_beat, frames_per_beat)) reinitialize = False for event in common_events: if event.type == pg.QUIT or ( event.type == pg.KEYDOWN and event.key == pg.K_ESCAPE ): self.audio.stop() pg.quit() sys.exit() elif event.type == pg.KEYDOWN: if event.key == pg.K_F5: single_random = True print("Switching to new random preset") elif event.key == pg.K_F6: print("Reload") self.effects.clear() reinitialize = True elif event.key == pg.K_F7: pause_beat = not pause_beat print(f"Pause beat: {pause_beat}") elif event.key == pg.K_F8: self.randomize = not self.randomize state_str = ( f"on, {self.randomize_interval} s" if self.randomize else "off" ) print(f"Random preset switching: [{state_str}]") elif event.key == pg.K_F9: self.render3d = not self.render3d print(f'Pseudo-3d view [{"on" if self.render3d else "off"}]') reinitialize = True elif event.key == pg.K_F10: self.trails = not self.trails print(f'Trails [{"on" if self.trails else "off"}]') reinitialize = True elif event.key == pg.K_F11: self.windowed = not self.windowed print(f'Windowed mode [{"on" if self.windowed else "off"}]') reinitialize = True elif event.key == pg.K_SPACE: blackout = not blackout print(f'BLACKOUT [{"ON" if blackout else "off"}]') elif event.key == pg.K_HOME: print("resetting beat timing") self.beat_skip_counter = 0 elif event.key == pg.K_PAGEUP: self.beat_factor = ( self.beat_factor // 2 if self.beat_factor > 1 else 1 ) print(f"Trigger on every {self.beat_factor} beat") elif event.key == pg.K_PAGEDOWN: self.beat_factor = self.beat_factor * 2 print(f"Trigger on every {self.beat_factor} beat") elif event.key == pg.K_RETURN or event.key == pg.K_KP_ENTER: strobe = True elif event.type == pg.KEYUP: if event.key == pg.K_RETURN or event.key == pg.K_KP_ENTER: strobe = False if reinitialize: self.window, self.background = self.initialize() if self.render3d: loop = self.render_loop_3d() else: loop = self.render_loop_normal() next(loop) if strobe: self.window.fill( Colors.White if (framecounter % 4 <= 1) else Colors.Black ) if blackout: self.window.fill(Colors.Black) # if is_beat: # pg.draw.rect(self.window, Colors.White, (0, 0, 20, 20)) current_fps = self.clock.get_fps() fps_slidewindow.append(current_fps) while len(fps_slidewindow) > 120: fps_slidewindow.pop(0) if self.print_fps and (framecounter & 0x3F == 0): print(f"FPS: {current_fps:5.2f}, avg {fps_mean:5.2f}") if fps_mean < 50: pg.draw.rect( self.window, Colors.Yellow, (self.window.get_width() - 20, 0, 20, 20), ) elif fps_mean < 40: pg.draw.rect( self.window, Colors.Red, (self.window.get_width() - 20, 0, 20, 20), ) if ( self.randomize and (framecounter % (self.randomize_interval * 60)) == 0 ) or single_random: single_random = False new_preset = self.presets.randomize() while new_preset == self.effects: new_preset = self.presets.randomize() print(new_preset) self.last_preset = new_preset self.effects.clear() self.effects.extend(self.presets[self.last_preset]) pg.display.flip() self.clock.tick(60) framecounter += 1 def app_main() -> None: argparser = ArgumentParser( description="beamshow - Render a light show for a video projector" ) argparser.add_argument( "-a", "--audio", metavar="NAME", type=str, default="", help="The audio device to use. Can be any substring", ) argparser.add_argument( "--3d", dest="render3d", action="store_true", help="Render a 3D preview instead of the normal output", ) argparser.add_argument( "--list-displays", action="store_true", help="Show available displays" ) argparser.add_argument( "--list-inputs", action="store_true", help="Show available audio inputs" ) argparser.add_argument( "-w", "--window", action="store_true", help="Display in a window" ) argparser.add_argument( "-b", "--beat", action="store_true", help="Effects should react to beats" ) argparser.add_argument( "--trails", action="store_true", help="Fade patterns out (trail mode)" ) argparser.add_argument( "--randomize", metavar="N", type=int, nargs="?", default=0, const=5, help="Select random effect presets after seconds", ) argparser.add_argument("--fps", action="store_true", help="Show FPS in console") argparser.add_argument( "-d", "--display", type=int, default=0, help="ID of the display to use" ) # print some nice output after the pygame banner to separate our stuff from theirs print("") print("-" * (len(str(argparser.description)) + 4)) print(f" {argparser.description} ") print("-" * (len(str(argparser.description)) + 4)) print("") args = argparser.parse_args() if args.list_displays: print_displays() if args.list_inputs: print_inputs() if not args.audio: print("Must select audio input") sys.exit(-1) show = Beamshow( audio_device_name=args.audio, render3d=args.render3d, window=args.window, trails=args.trails, randomize_interval=args.randomize, fps=args.fps, display=args.display, beat_reactive=args.beat, ) show.main() if __name__ == "__main__": app_main()