Compare commits

..

2 commits

Author SHA1 Message Date
Patrick Moessler
a6b2992cc2 add testserver, slightly broken 2023-06-08 01:31:03 +02:00
Patrick Moessler
55436a31b3 restructure btn handling 2023-06-08 01:30:53 +02:00
2 changed files with 270 additions and 70 deletions

View file

@ -10,23 +10,31 @@ from argparse import ArgumentParser
from websockets.client import connect, WebSocketClientProtocol from websockets.client import connect, WebSocketClientProtocol
from pydantic import BaseModel from pydantic import BaseModel
URL = "ws://localhost:8124"
URL_PARAMS = {
"protocol-version": "1.0.0",
"manufacturer": "TeamsCtrl",
"device": "TeamsCtrl",
"app": "TeamsCtrlApp",
"app-version": "1.4",
}
BUTTONS = {"cam": 1, "mic": 2} class Button(BaseModel):
num: int
color_on: tuple[int, int, int]
color_off: tuple[int, int, int]
COLORS = (
((0, 10, 0), (0, 255, 0)), class BUTTONS:
((10, 0, 0), (255, 0, 0)), cam = Button(num=0, color_off=(0, 0, 10), color_on=(0, 0, 255))
((0, 0, 10), (0, 0, 255)), mic = Button(num=1, color_off=(10, 0, 0), color_on=(255, 0, 0))
((10, 10, 10), (255, 255, 255)), hand = Button(num=2, color_off=(10, 10, 0), color_on=(255, 192, 0))
) call = Button(num=3, color_off=(0, 10, 0), color_on=(255, 64, 0))
all = [cam, mic, hand, call]
# class consts:
# BUTTONS = {"cam": 0, "mic": 1, "call": 3, "hand": 2}
# COLORS = {
# "mic":,
# "hand":,
# "call":,
# }
class MeetingPermissions(BaseModel): class MeetingPermissions(BaseModel):
@ -67,61 +75,109 @@ class State:
def __init__(self, ctrl: "TeamsCtrl") -> None: def __init__(self, ctrl: "TeamsCtrl") -> None:
self.ctrl = ctrl self.ctrl = ctrl
def process(self) -> str | None: async def process(self) -> str | None:
time.sleep(0.1) await asyncio.sleep(0.1)
return None return None
def handle_key(self, key_num: int, state: bool) -> str | None: async def handle_key(self, key_num: int, state: bool) -> str | None:
print(f"key:{key_num}, state:{state}") print(f"key:{key_num}, state:{state}")
if key_num == BUTTONS["mic"]: if state is True:
self.ctrl.send_ws_cmd( msg = {
{ "apiVersion": "1.0.0",
"apiVersion": "1.0.0", "manufacturer": "Elgato",
"service": "toggle-mute", "device": "StreamDeck",
"action": "toggle-mute", "timestamp": time.time_ns() // 1000,
"manufacturer": "Elgato", }
"device": "StreamDeck", match key_num:
"timestamp": 1675341775725, case BUTTONS.mic.num:
} msg.update(
) {
if key_num == BUTTONS["cam"]: "service": "toggle-mute",
self.ctrl.send_ws_cmd( "action": "toggle-mute",
{ }
"apiVersion": "1.0.0", )
"service": "toggle-video", case BUTTONS.cam.num:
"action": "toggle-video", msg.update(
"manufacturer": "Elgato", {
"device": "StreamDeck", "service": "toggle-video",
"timestamp": 1675341791249, "action": "toggle-video",
} }
) )
case BUTTONS.call.num:
msg.update(
{
"service": "call",
"action": "leave-call",
}
)
case BUTTONS.hand.num:
msg.update(
{
"service": "raise-hand",
"action": "toggle-hand",
}
)
case _:
return None
await self.ctrl.send_ws_cmd(msg)
return None return None
def handle_state_change(self, changes: dict[str, bool]) -> str | None: def handle_state_change(self, changes: dict[str, bool]) -> str | None:
if "isInMeeting" in changes: if "isInMeeting" in changes:
if changes["isInMeeting"]: if changes["isInMeeting"]:
for key_num in range(4): for btn in BUTTONS.all:
self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) self.ctrl.set_color(
key_num=btn.num,
color=btn.color_off,
)
else: else:
for key_num in range(4): for btn in BUTTONS.all:
self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) self.ctrl.set_color(key_num=btn.num, color=(0, 0, 0))
if "isMuted" in changes: if "isMuted" in changes:
key_num = BUTTONS["mic"] key_num = BUTTONS.mic.num
if changes["isMuted"]: if changes["isMuted"]:
self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) self.ctrl.set_color(key_num=key_num, color=BUTTONS.mic.color_on)
else: else:
self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) self.ctrl.set_color(key_num=key_num, color=BUTTONS.mic.color_off)
if "isCameraOn" in changes: if "isCameraOn" in changes:
key_num = BUTTONS["cam"] key_num = BUTTONS.cam.num
if changes["isCameraOn"]: if changes["isCameraOn"]:
self.ctrl.set_color(key_num=key_num, color=COLORS[key_num][1]) self.ctrl.set_color(key_num=key_num, color=BUTTONS.cam.color_on)
else: else:
self.ctrl.set_color(key_num=key_num, color=(0, 0, 0)) self.ctrl.set_color(key_num=key_num, color=BUTTONS.cam.color_off)
if "isHandRaised" in changes:
key_num = BUTTONS.hand.num
if changes["isHandRaised"]:
self.ctrl.set_color(key_num=key_num, color=BUTTONS.hand.color_on)
else:
self.ctrl.set_color(key_num=key_num, color=BUTTONS.hand.color_off)
if "hasUnreadMessages" in changes:
if changes["hasUnreadMessages"]:
return "notify"
return None return None
class InMeeting(State): class Notify(State):
pass async def process(self) -> str | None:
self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_on)
await asyncio.sleep(0.1)
self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_off)
await asyncio.sleep(0.2)
self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_on)
await asyncio.sleep(0.1)
self.ctrl.set_color(key_num=BUTTONS.call.num, color=BUTTONS.call.color_off)
await asyncio.sleep(0.6)
return None
def handle_state_change(self, changes: dict[str, bool]) -> str | None:
super_state = super().handle_state_change(changes)
if "hasUnreadMessages" in changes:
if changes["hasUnreadMessages"] is False and "isInMeeting" not in changes:
self.ctrl.set_color(
key_num=BUTTONS.call.num, color=BUTTONS.call.color_off
)
return "idle"
return super_state
class TeamsCtrl: class TeamsCtrl:
@ -137,7 +193,7 @@ class TeamsCtrl:
self.serial_reader, self.serial_writer = serial self.serial_reader, self.serial_writer = serial
self.ws = ws self.ws = ws
self.state_machine = {"idle": State(self), "inmeeting": InMeeting(self)} self.state_machine = {"idle": State(self), "notify": Notify(self)}
self.current_state_name = "idle" self.current_state_name = "idle"
self.meeting_state: MeetingState | None = None self.meeting_state: MeetingState | None = None
@ -156,8 +212,8 @@ class TeamsCtrl:
print(cmd) print(cmd)
self.serial_writer.write(cmd.encode("utf-8")) self.serial_writer.write(cmd.encode("utf-8"))
async def process_serial(self): async def process_serial(self) -> None:
while True: while not self.loop.is_closed():
raw_msg = await self.serial_reader.readline() raw_msg = await self.serial_reader.readline()
msg = raw_msg.decode("utf-8").strip() msg = raw_msg.decode("utf-8").strip()
action = msg[0] action = msg[0]
@ -172,13 +228,15 @@ class TeamsCtrl:
key_state = False key_state = False
case _: case _:
raise ValueError(f"unknown action: {action}") raise ValueError(f"unknown action: {action}")
next_state = self.current_state.handle_key(key_num=key_num, state=key_state) next_state = await self.current_state.handle_key(
key_num=key_num, state=key_state
)
if next_state is not None: if next_state is not None:
self.set_next_state(next_state=next_state) self.set_next_state(next_state=next_state)
def send_ws_cmd(self, cmd: dict[str, Any]) -> None: async def send_ws_cmd(self, cmd: dict[str, Any]) -> None:
msg = json.dumps(cmd) msg = json.dumps(cmd)
self.loop.run_until_complete(self.ws.send(msg)) await self.ws.send(msg)
async def process_ws(self) -> None: async def process_ws(self) -> None:
async for raw_msg in self.ws: async for raw_msg in self.ws:
@ -202,6 +260,33 @@ class TeamsCtrl:
if next_state is not None: if next_state is not None:
self.set_next_state(next_state=next_state) self.set_next_state(next_state=next_state)
async def process_statemachine(self) -> None:
while not self.loop.is_closed():
await self.current_state.process()
def gather_all(self):
return asyncio.gather(
self.process_serial(), self.process_ws(), self.process_statemachine()
)
async def amain():
loop = asyncio.get_event_loop()
serial = await serial_asyncio.open_serial_connection(
loop=loop, url=args.port, baudrate=115200
)
url = f"ws://localhost:8124?token={token}&protocol-version=1.0.0&manufacturer=TeamsCtrl&device=TeamsCtrl&app=TeamsCtrlApp&app-version=1.4"
ws = await connect(url)
ctrl = TeamsCtrl(loop=loop, serial=serial, ws=ws)
try:
tasks = ctrl.gather_all()
await tasks
finally:
for btn in BUTTONS.all:
ctrl.set_color(key_num=btn.num, color=(0, 0, 0))
if __name__ == "__main__": if __name__ == "__main__":
parser = ArgumentParser(description=__doc__) parser = ArgumentParser(description=__doc__)
@ -214,16 +299,7 @@ if __name__ == "__main__":
with open(args.token) as tf: with open(args.token) as tf:
token = tf.read().strip() token = tf.read().strip()
loop = asyncio.get_event_loop() asyncio.run(amain())
# loop = asyncio.new_event_loop() # loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
serial = loop.run_until_complete(
serial_asyncio.open_serial_connection(loop=loop, url=args.port, baudrate=115200)
)
url = f"ws://localhost:8124?token={token}&protocol-version=1.0.0&manufacturer=TeamsCtrl&device=TeamsCtrl&app=TeamsCtrlApp&app-version=1.4"
ws = loop.run_until_complete(connect(url))
ctrl = TeamsCtrl(loop=loop, serial=serial, ws=ws)
loop.run_forever()
loop.close()

124
tests/testserver.py Normal file
View file

@ -0,0 +1,124 @@
import time
import json
from typing import Literal
from websockets.sync.server import serve, ServerConnection
from websockets.exceptions import ConnectionClosedError
from threading import Thread
from pydantic import BaseModel, Field
class MeetingPermissions(BaseModel):
canToggleMute: bool = False
canToggleVideo: bool = False
canToggleHand: bool = False
canToggleBlur: bool = False
canToggleRecord: bool = False
canLeave: bool = False
canReact: bool = False
canToggleShareTray: bool = False
canToggleChat: bool = False
canStopSharing: bool = False
canPair: bool = False
class MeetingState(BaseModel):
isMuted: bool = False
isCameraOn: bool = False
isHandRaised: bool = False
isInMeeting: bool = False
isRecordingOn: bool = False
isBackgroundBlurred: bool = False
hasUnreadMessages: bool = False
class MeetingUpdate(BaseModel):
meetingState: MeetingState = Field(default_factory=MeetingState)
meetingPermissions: MeetingPermissions = Field(default_factory=MeetingPermissions)
class TeamsMsg(BaseModel):
apiVersion: Literal["1.0.0"] = "1.0.0"
meetingUpdate: MeetingUpdate = Field(default_factory=MeetingUpdate)
class TeamsSim:
def __init__(self) -> None:
self.meeting = TeamsMsg()
self.rx_thread = Thread(target=self.rx_thread_run)
self.ws: ServerConnection | None = None
self.active: bool = True
# {
# "apiVersion": "1.0.0",
# "service": "toggle-mute",
# "action": "toggle-mute",
# "manufacturer": "Elgato",
# "device": "NkTeamsCtrl",
# "timestamp": time.time_ns() // 1000,
# }
def rx_thread_run(self):
assert self.ws is not None
while self.active:
try:
raw_msg = self.ws.recv()
except ConnectionClosedError:
self.ws = None
self.active = False
break
msg = json.loads(raw_msg)
print(f">{msg}")
match msg.get("action", ""):
case "toggle-mute":
self.meeting.meetingUpdate.meetingState.isMuted ^= True
case "toggle-video":
self.meeting.meetingUpdate.meetingState.isCameraOn ^= True
case "toggle-hand":
self.meeting.meetingUpdate.meetingState.isHandRaised ^= True
case "leave-call":
self.meeting.meetingUpdate.meetingState.isInMeeting = False
case _:
continue
self.send_status()
def send_status(self) -> None:
if self.ws is None:
return
msg = self.meeting.json()
print(f"<{msg}")
self.ws.send(msg)
def handler(self, websocket: ServerConnection):
if self.rx_thread.is_alive():
self.active = False
if self.ws is not None:
self.ws.close_socket()
self.rx_thread.join()
self.rx_thread = Thread(target=self.rx_thread_run)
self.ws = websocket
self.active = True
self.rx_thread.start()
while True:
time.sleep(0.5)
if self.meeting.meetingUpdate.meetingState.isInMeeting is False:
print('### not in meeting, "reconnecting" in 5')
time.sleep(5)
self.meeting.meetingUpdate.meetingState.isInMeeting = True
self.send_status()
time.sleep(5)
self.meeting.meetingUpdate.meetingState.hasUnreadMessages = True
self.send_status()
time.sleep(5)
self.meeting.meetingUpdate.meetingState.hasUnreadMessages = False
self.send_status()
def run(self):
with serve(self.handler, host="localhost", port=8124) as server:
server.serve_forever()
if __name__ == "__main__":
srv = TeamsSim()
srv.run()