From 55436a31b327ac030228be13c10a45561593568a Mon Sep 17 00:00:00 2001 From: Patrick Moessler Date: Thu, 8 Jun 2023 01:30:53 +0200 Subject: [PATCH] restructure btn handling --- src/teams_ctrl.py | 216 +++++++++++++++++++++++++++++++--------------- 1 file changed, 146 insertions(+), 70 deletions(-) diff --git a/src/teams_ctrl.py b/src/teams_ctrl.py index 43f0323..b033355 100644 --- a/src/teams_ctrl.py +++ b/src/teams_ctrl.py @@ -10,23 +10,31 @@ from argparse import ArgumentParser from websockets.client import connect, WebSocketClientProtocol from pydantic import BaseModel -URL = "ws://localhost:8124" -URL_PARAMS = { - "protocol-version": "1.0.0", - "manufacturer": "TeamsCtrl", - "device": "TeamsCtrl", - "app": "TeamsCtrlApp", - "app-version": "1.4", -} -BUTTONS = {"cam": 1, "mic": 2} +class Button(BaseModel): + num: int + color_on: tuple[int, int, int] + color_off: tuple[int, int, int] -COLORS = ( - ((0, 10, 0), (0, 255, 0)), - ((10, 0, 0), (255, 0, 0)), - ((0, 0, 10), (0, 0, 255)), - ((10, 10, 10), (255, 255, 255)), -) + +class BUTTONS: + cam = Button(num=0, color_off=(0, 0, 10), color_on=(0, 0, 255)) + mic = Button(num=1, color_off=(10, 0, 0), color_on=(255, 0, 0)) + hand = Button(num=2, color_off=(10, 10, 0), color_on=(255, 192, 0)) + call = Button(num=3, color_off=(0, 10, 0), color_on=(255, 64, 0)) + all = [cam, mic, hand, call] + + +# class consts: + + +# BUTTONS = {"cam": 0, "mic": 1, "call": 3, "hand": 2} + +# COLORS = { +# "mic":, +# "hand":, +# "call":, +# } class MeetingPermissions(BaseModel): @@ -67,61 +75,109 @@ class State: def __init__(self, ctrl: "TeamsCtrl") -> None: self.ctrl = ctrl - def process(self) -> str | None: - time.sleep(0.1) + async def process(self) -> str | None: + await asyncio.sleep(0.1) return None - def handle_key(self, key_num: int, state: bool) -> str | None: + async def handle_key(self, key_num: int, state: bool) -> str | None: print(f"key:{key_num}, state:{state}") - if key_num == BUTTONS["mic"]: - self.ctrl.send_ws_cmd( - { - "apiVersion": "1.0.0", - "service": "toggle-mute", - "action": "toggle-mute", - "manufacturer": "Elgato", - "device": "StreamDeck", - "timestamp": 1675341775725, - } - ) - if key_num == BUTTONS["cam"]: - self.ctrl.send_ws_cmd( - { - "apiVersion": "1.0.0", - "service": "toggle-video", - "action": "toggle-video", - "manufacturer": "Elgato", - "device": "StreamDeck", - "timestamp": 1675341791249, - } - ) + if state is True: + msg = { + "apiVersion": "1.0.0", + "manufacturer": "Elgato", + "device": "StreamDeck", + "timestamp": time.time_ns() // 1000, + } + match key_num: + case BUTTONS.mic.num: + msg.update( + { + "service": "toggle-mute", + "action": "toggle-mute", + } + ) + case BUTTONS.cam.num: + msg.update( + { + "service": "toggle-video", + "action": "toggle-video", + } + ) + case BUTTONS.call.num: + msg.update( + { + "service": "call", + "action": "leave-call", + } + ) + case BUTTONS.hand.num: + msg.update( + { + "service": "raise-hand", + "action": "toggle-hand", + } + ) + case _: + return None + await self.ctrl.send_ws_cmd(msg) return None def handle_state_change(self, changes: dict[str, bool]) -> str | None: if "isInMeeting" in changes: if changes["isInMeeting"]: - for key_num in range(4): - self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) + for btn in BUTTONS.all: + self.ctrl.set_color( + key_num=btn.num, + color=btn.color_off, + ) else: - for key_num in range(4): - self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) + for btn in BUTTONS.all: + self.ctrl.set_color(key_num=btn.num, color=(0, 0, 0)) if "isMuted" in changes: - key_num = BUTTONS["mic"] + key_num = BUTTONS.mic.num if changes["isMuted"]: - self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) + self.ctrl.set_color(key_num=key_num, color=BUTTONS.mic.color_on) else: - self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) + self.ctrl.set_color(key_num=key_num, color=BUTTONS.mic.color_off) if "isCameraOn" in changes: - key_num = BUTTONS["cam"] + key_num = BUTTONS.cam.num if changes["isCameraOn"]: - self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) + self.ctrl.set_color(key_num=key_num, color=BUTTONS.cam.color_on) else: - self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) + self.ctrl.set_color(key_num=key_num, color=BUTTONS.cam.color_off) + if "isHandRaised" in changes: + key_num = BUTTONS.hand.num + if changes["isHandRaised"]: + self.ctrl.set_color(key_num=key_num, color=BUTTONS.hand.color_on) + else: + self.ctrl.set_color(key_num=key_num, color=BUTTONS.hand.color_off) + if "hasUnreadMessages" in changes: + if changes["hasUnreadMessages"]: + return "notify" return None -class InMeeting(State): - pass +class Notify(State): + async def process(self) -> str | None: + self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_on) + await asyncio.sleep(0.1) + self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_off) + await asyncio.sleep(0.2) + self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_on) + await asyncio.sleep(0.1) + self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_off) + await asyncio.sleep(0.6) + return None + + def handle_state_change(self, changes: dict[str, bool]) -> str | None: + super_state = super().handle_state_change(changes) + if "hasUnreadMessages" in changes: + if changes["hasUnreadMessages"] is False and "isInMeeting" not in changes: + self.ctrl.set_color( + key_num=BUTTONS.call.num, color=BUTTONS.call.color_off + ) + return "idle" + return super_state class TeamsCtrl: @@ -137,7 +193,7 @@ class TeamsCtrl: self.serial_reader, self.serial_writer = serial self.ws = ws - self.state_machine = {"idle": State(self), "inmeeting": InMeeting(self)} + self.state_machine = {"idle": State(self), "notify": Notify(self)} self.current_state_name = "idle" self.meeting_state: MeetingState | None = None @@ -156,8 +212,8 @@ class TeamsCtrl: print(cmd) self.serial_writer.write(cmd.encode("utf-8")) - async def process_serial(self): - while True: + async def process_serial(self) -> None: + while not self.loop.is_closed(): raw_msg = await self.serial_reader.readline() msg = raw_msg.decode("utf-8").strip() action = msg[0] @@ -172,13 +228,15 @@ class TeamsCtrl: key_state = False case _: raise ValueError(f"unknown action: {action}") - next_state = self.current_state.handle_key(key_num=key_num, state=key_state) + next_state = await self.current_state.handle_key( + key_num=key_num, state=key_state + ) if next_state is not None: self.set_next_state(next_state=next_state) - def send_ws_cmd(self, cmd: dict[str, Any]) -> None: + async def send_ws_cmd(self, cmd: dict[str, Any]) -> None: msg = json.dumps(cmd) - self.loop.run_until_complete(self.ws.send(msg)) + await self.ws.send(msg) async def process_ws(self) -> None: async for raw_msg in self.ws: @@ -202,6 +260,33 @@ class TeamsCtrl: if next_state is not None: self.set_next_state(next_state=next_state) + async def process_statemachine(self) -> None: + while not self.loop.is_closed(): + await self.current_state.process() + + def gather_all(self): + return asyncio.gather( + self.process_serial(), self.process_ws(), self.process_statemachine() + ) + + +async def amain(): + loop = asyncio.get_event_loop() + serial = await serial_asyncio.open_serial_connection( + loop=loop, url=args.port, baudrate=115200 + ) + + url = f"ws://localhost:8124?token={token}&protocol-version=1.0.0&manufacturer=TeamsCtrl&device=TeamsCtrl&app=TeamsCtrlApp&app-version=1.4" + ws = await connect(url) + + ctrl = TeamsCtrl(loop=loop, serial=serial, ws=ws) + try: + tasks = ctrl.gather_all() + await tasks + finally: + for btn in BUTTONS.all: + ctrl.set_color(key_num=btn.num, color=(0, 0, 0)) + if __name__ == "__main__": parser = ArgumentParser(description=__doc__) @@ -214,16 +299,7 @@ if __name__ == "__main__": with open(args.token) as tf: token = tf.read().strip() - loop = asyncio.get_event_loop() + asyncio.run(amain()) + # loop = asyncio.new_event_loop() - - serial = loop.run_until_complete( - serial_asyncio.open_serial_connection(loop=loop, url=args.port, baudrate=115200) - ) - - url = f"ws://localhost:8124?token={token}&protocol-version=1.0.0&manufacturer=TeamsCtrl&device=TeamsCtrl&app=TeamsCtrlApp&app-version=1.4" - ws = loop.run_until_complete(connect(url)) - - ctrl = TeamsCtrl(loop=loop, serial=serial, ws=ws) - loop.run_forever() - loop.close() + # asyncio.set_event_loop(loop)