teams_ctrl/tests/testserver.py
2023-06-08 01:31:03 +02:00

124 lines
3.9 KiB
Python

import time
import json
from typing import Literal
from websockets.sync.server import serve, ServerConnection
from websockets.exceptions import ConnectionClosedError
from threading import Thread
from pydantic import BaseModel, Field
class MeetingPermissions(BaseModel):
canToggleMute: bool = False
canToggleVideo: bool = False
canToggleHand: bool = False
canToggleBlur: bool = False
canToggleRecord: bool = False
canLeave: bool = False
canReact: bool = False
canToggleShareTray: bool = False
canToggleChat: bool = False
canStopSharing: bool = False
canPair: bool = False
class MeetingState(BaseModel):
isMuted: bool = False
isCameraOn: bool = False
isHandRaised: bool = False
isInMeeting: bool = False
isRecordingOn: bool = False
isBackgroundBlurred: bool = False
hasUnreadMessages: bool = False
class MeetingUpdate(BaseModel):
meetingState: MeetingState = Field(default_factory=MeetingState)
meetingPermissions: MeetingPermissions = Field(default_factory=MeetingPermissions)
class TeamsMsg(BaseModel):
apiVersion: Literal["1.0.0"] = "1.0.0"
meetingUpdate: MeetingUpdate = Field(default_factory=MeetingUpdate)
class TeamsSim:
def __init__(self) -> None:
self.meeting = TeamsMsg()
self.rx_thread = Thread(target=self.rx_thread_run)
self.ws: ServerConnection | None = None
self.active: bool = True
# {
# "apiVersion": "1.0.0",
# "service": "toggle-mute",
# "action": "toggle-mute",
# "manufacturer": "Elgato",
# "device": "NkTeamsCtrl",
# "timestamp": time.time_ns() // 1000,
# }
def rx_thread_run(self):
assert self.ws is not None
while self.active:
try:
raw_msg = self.ws.recv()
except ConnectionClosedError:
self.ws = None
self.active = False
break
msg = json.loads(raw_msg)
print(f">{msg}")
match msg.get("action", ""):
case "toggle-mute":
self.meeting.meetingUpdate.meetingState.isMuted ^= True
case "toggle-video":
self.meeting.meetingUpdate.meetingState.isCameraOn ^= True
case "toggle-hand":
self.meeting.meetingUpdate.meetingState.isHandRaised ^= True
case "leave-call":
self.meeting.meetingUpdate.meetingState.isInMeeting = False
case _:
continue
self.send_status()
def send_status(self) -> None:
if self.ws is None:
return
msg = self.meeting.json()
print(f"<{msg}")
self.ws.send(msg)
def handler(self, websocket: ServerConnection):
if self.rx_thread.is_alive():
self.active = False
if self.ws is not None:
self.ws.close_socket()
self.rx_thread.join()
self.rx_thread = Thread(target=self.rx_thread_run)
self.ws = websocket
self.active = True
self.rx_thread.start()
while True:
time.sleep(0.5)
if self.meeting.meetingUpdate.meetingState.isInMeeting is False:
print('### not in meeting, "reconnecting" in 5')
time.sleep(5)
self.meeting.meetingUpdate.meetingState.isInMeeting = True
self.send_status()
time.sleep(5)
self.meeting.meetingUpdate.meetingState.hasUnreadMessages = True
self.send_status()
time.sleep(5)
self.meeting.meetingUpdate.meetingState.hasUnreadMessages = False
self.send_status()
def run(self):
with serve(self.handler, host="localhost", port=8124) as server:
server.serve_forever()
if __name__ == "__main__":
srv = TeamsSim()
srv.run()