import time import json from typing import Literal from websockets.sync.server import serve, ServerConnection from websockets.exceptions import ConnectionClosedError from threading import Thread from pydantic import BaseModel, Field class MeetingPermissions(BaseModel): canToggleMute: bool = False canToggleVideo: bool = False canToggleHand: bool = False canToggleBlur: bool = False canToggleRecord: bool = False canLeave: bool = False canReact: bool = False canToggleShareTray: bool = False canToggleChat: bool = False canStopSharing: bool = False canPair: bool = False class MeetingState(BaseModel): isMuted: bool = False isCameraOn: bool = False isHandRaised: bool = False isInMeeting: bool = False isRecordingOn: bool = False isBackgroundBlurred: bool = False hasUnreadMessages: bool = False class MeetingUpdate(BaseModel): meetingState: MeetingState = Field(default_factory=MeetingState) meetingPermissions: MeetingPermissions = Field(default_factory=MeetingPermissions) class TeamsMsg(BaseModel): apiVersion: Literal["1.0.0"] = "1.0.0" meetingUpdate: MeetingUpdate = Field(default_factory=MeetingUpdate) class TeamsSim: def __init__(self) -> None: self.meeting = TeamsMsg() self.rx_thread = Thread(target=self.rx_thread_run) self.ws: ServerConnection | None = None self.active: bool = True # { # "apiVersion": "1.0.0", # "service": "toggle-mute", # "action": "toggle-mute", # "manufacturer": "Elgato", # "device": "NkTeamsCtrl", # "timestamp": time.time_ns() // 1000, # } def rx_thread_run(self): assert self.ws is not None while self.active: try: raw_msg = self.ws.recv() except ConnectionClosedError: self.ws = None self.active = False break msg = json.loads(raw_msg) print(f">{msg}") match msg.get("action", ""): case "toggle-mute": self.meeting.meetingUpdate.meetingState.isMuted ^= True case "toggle-video": self.meeting.meetingUpdate.meetingState.isCameraOn ^= True case "toggle-hand": self.meeting.meetingUpdate.meetingState.isHandRaised ^= True case "leave-call": self.meeting.meetingUpdate.meetingState.isInMeeting = False case _: continue self.send_status() def send_status(self) -> None: if self.ws is None: return msg = self.meeting.json() print(f"<{msg}") self.ws.send(msg) def handler(self, websocket: ServerConnection): if self.rx_thread.is_alive(): self.active = False if self.ws is not None: self.ws.close_socket() self.rx_thread.join() self.rx_thread = Thread(target=self.rx_thread_run) self.ws = websocket self.active = True self.rx_thread.start() while True: time.sleep(0.5) if self.meeting.meetingUpdate.meetingState.isInMeeting is False: print('### not in meeting, "reconnecting" in 5') time.sleep(5) self.meeting.meetingUpdate.meetingState.isInMeeting = True self.send_status() time.sleep(5) self.meeting.meetingUpdate.meetingState.hasUnreadMessages = True self.send_status() time.sleep(5) self.meeting.meetingUpdate.meetingState.hasUnreadMessages = False self.send_status() def run(self): with serve(self.handler, host="localhost", port=8124) as server: server.serve_forever() if __name__ == "__main__": srv = TeamsSim() srv.run()