_rx_stream = None _pkt_handlers = {} class Packet: SOP = 0x2B MAX_LENGTH = 255 # TBD INPUT_JOYSTICK = 0x01 INPUT_WHEEL = 0x03 EFFECT_STATES = 0x02 SET_CONTROL = 0x40 CONTROL = 0x41 SET_DEVICE_STATE = 0x42 SET_OVERALL = 0x43 QUERY = 0xFF Types = [ INPUT_JOYSTICK, INPUT_WHEEL, EFFECT_STATES, SET_CONTROL, CONTROL, SET_DEVICE_STATE, SET_OVERALL, QUERY ] class ControlType: DEAD_ZONE = 0x00 IGNORE_DEADMAN_SWITCH = 0x01 ENABLE_COMM_WATCHDOG = 0x02 SET_SPRING_STRENGTH = 0x03 ENABLE_SPRING = 0x04 SET_AXIS_SATURATION = 0x05 class OverallControlType: GAIN = 0x00 class DeviceState: PAUSE_FFB = 0x08 ENABLE_FFB = 0x04 STOP_ALL = 0x01 class Query: # Queries BufferSize = 0x42 # ('B'uffer size) Manufacturer = 0x4d # ('M'anufacturer) Product = 0x50 # ('P'roduct) Version = 0x56 # ('V'ersion) NumberOfEffects = 0x4e # ('N'umber of effects) Effect = 0x45 # ('E')ffect # Control Open = 0x4f # ('O'pen) Close = 0x43 # ('C')lose def tx_packet(command, data): cs = Packet.SOP yield Packet.SOP cs ^= command yield command cs ^= len(data) yield len(data) for d in data: cs ^= d yield d yield cs def rx_packet(instream): while instream: # sync start c = 0 cs = Packet.SOP while c != Packet.SOP: c = next(instream) # packet type pkt_type = next(instream) cs ^= pkt_type if pkt_type not in Packet.Types: continue # packet length pkt_len = next(instream) cs ^= pkt_len assert pkt_len < Packet.MAX_LENGTH # data data = [] for _ in range(pkt_len): d = next(instream) cs ^= d data.append(d) # checksum cs_in = next(instream) cs ^= cs_in yield (pkt_type, data, (cs == 0)) def create_rx_stream(input_stream): global _rx_stream _rx_stream = rx_packet(input_stream) def rx_pump(): global _rx_stream command, data, ok = next(_rx_stream) if ok: handle_pkt(command, data) def set_handler(pkt_type, handler): global _pkt_handlers _pkt_handlers[pkt_type] = handler def handle_pkt(command, data): global _pkt_handlers if command == Packet.INPUT_WHEEL: output = { 'axis': { 'wheel': int.from_bytes(data[0:2], byteorder='little', signed=True) / 2048, 'gas': 1 - data[2] / 255, 'brake': 1 - data[3] / 255 }, 'buttons': { 'B': bool(data[5] & 0x20), 'A': bool(data[5] & 0x10), '4': bool(data[5] & 0x08), '3': bool(data[5] & 0x04), '2_PaddleDown': bool(data[5] & 0x02), '1_PaddleUp_Start': bool(data[5] & 0x01) }, 'hat': { 'center': (data[6] & 0xF0) == 0xF0, 'up': (data[6] & 0xF0) == 0x00, 'down': (data[6] & 0xF0) == 0x40, 'left': (data[6] & 0xF0) == 0x60, 'right': (data[6] & 0xF0) == 0x20 } } elif command == Packet.QUERY: output = { 'query': data[0], 'data': data[1:] } else: output = data if _pkt_handlers.get(command): _pkt_handlers[command](output)