From 9812b0ff7ca97ddb04d37113fed34305725d6fb9 Mon Sep 17 00:00:00 2001 From: "S. Seegel" Date: Sun, 6 Dec 2020 21:53:58 +0100 Subject: [PATCH] add Brennenstuhl RCS1000 refactoring --- apps/rcprotocols.py | 187 +++++++++++++++++++++----------------------- apps/rcpulsegw.py | 0 2 files changed, 89 insertions(+), 98 deletions(-) mode change 100644 => 100755 apps/rcpulsegw.py diff --git a/apps/rcprotocols.py b/apps/rcprotocols.py index f580f60..0c53419 100644 --- a/apps/rcprotocols.py +++ b/apps/rcprotocols.py @@ -5,6 +5,12 @@ import threading import time PARAM_ID = ('i', 'id') +PARAM_HOUSE = ('o', 'house') +PARAM_GROUP = ('g', 'group') +PARAM_UNIT = ('u', 'unit') +PARAM_COMMAND = ('a', 'command') +PARAM_CODE = ('c', 'code') +PARAM_DIPS = ('d', 'dips') class RcProtocol: def __init__(self): @@ -104,7 +110,19 @@ class RcProtocol: return dec, int(1.0 * sumpulse / sumstep), rep return None, None, None - + + def _encode_command(self, command): + if command in self._commands: + return self._commands[command] + else: + raise Exception("Invalid command") + + def _decode_command(self, symbols): + for k in self._commands: + if self._commands[k] == symbols: + return k + raise Exception("Unknown command") + def _build_frame(self, symbols, timebase=None, repetitions=None): self._reset() @@ -159,9 +177,7 @@ class Tristate(TristateBase): #old intertechno def __init__(self): self._name = "tristate" TristateBase.__init__(self) - self.params = [ - ('c', 'code'), - ] + self.params = [PARAM_CODE] def encode(self, params, timebase=None, repetitions=None): return self._build_frame(params["code"], timebase, repetitions) @@ -175,12 +191,8 @@ class ITTristate(TristateBase): #old intertechno def __init__(self): self._name = "ittristate" TristateBase.__init__(self) - self.params = [ - ('o', 'house'), #A-P - ('g', 'group'), #1-4 - ('u', 'unit'), #1-4 - ('a', 'command'), #on|off - ] + self.params = [PARAM_HOUSE, PARAM_GROUP, PARAM_UNIT, PARAM_COMMAND] + self._commands = {"on": "FF", "off": "F0"} def encode(self, params, timebase=None, repetitions=None): symbols = "" @@ -188,7 +200,7 @@ class ITTristate(TristateBase): #old intertechno symbols += self._encode_int(int(params["unit"]) - 1, 2) symbols += self._encode_int(int(params["group"]) - 1, 2) symbols += "0f" - symbols += "ff" if params["command"] == "on" else "f0" + symbols += self._encode_command(params["command"]) return self._build_frame(symbols, timebase, repetitions) def decode(self, pulsetrain): @@ -198,7 +210,43 @@ class ITTristate(TristateBase): #old intertechno "house": chr(self._decode_int(symbols[:4]) + ord('A')), "unit": self._decode_int(symbols[4:6]) + 1, "group": self._decode_int(symbols[6:8]) + 1, - "command": "on" if symbols[10:12].upper() == "FF" else "off", + "command": self._decode_command(symbols[10:12].upper()), + } + return self._name, params, tb, rep + +class Brennenstuhl(TristateBase): #old intertechno + def __init__(self): + self._name = "brennenstuhl" + TristateBase.__init__(self) + self.params = [PARAM_DIPS, PARAM_UNIT, PARAM_COMMAND] + self._commands = {"on": "0F", "off": "F0"} + + def encode(self, params, timebase=None, repetitions=None): + symbols = "" + for c in params["dips"]: + symbols += '0' if c == '1' else 'F' + for i in range(4): + symbols += '0' if (int(params["unit"]) - 1) == i else 'F' + symbols += "F" + symbols += self._encode_command(params["command"]) + return self._build_frame(symbols, timebase, repetitions) + + def decode(self, pulsetrain): + symbols, tb, rep = self._decode_symbols(pulsetrain[0:-2]) + if symbols: + dips = "" + for s in symbols[0:5]: + dips += "1" if s == '0' else "0" + unit = 0 + for u in symbols[5:9]: + unit += 1 + if u == '0': + break + + params = { + "dips": dips, + "unit": unit, + "command": self._decode_command(symbols[10:12].upper()), } return self._name, params, tb, rep @@ -226,11 +274,8 @@ class Intertechno(PPM1): PPM1.__init__(self) self._name = "intertechno" self._timebase = 275 - self.params = [ - PARAM_ID, - ('u', 'unit'), - ('a', 'command'), - ] + self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND] + self._commands = {"on": "1", "off": "0"} def _encode_unit(self, unit): return "{:04b}".format(int(unit) - 1) @@ -242,7 +287,7 @@ class Intertechno(PPM1): symbols = "" symbols += "{:026b}".format(int(params["id"])) symbols += "0" #group - symbols += "1" if params["command"] == "on" else "0" + symbols += self._encode_command(params["command"]) symbols += self._encode_unit(params["unit"]) return self._build_frame(symbols, timebase, repetitions) @@ -252,7 +297,7 @@ class Intertechno(PPM1): params = { "id": int(symbols[:26], 2), "unit": self._decode_unit(symbols[28:32]), - "command": "on" if symbols[27] == "1" else "off" + "command": self._decode_command(symbols[27]) } return self._name, params, tb, rep @@ -291,11 +336,8 @@ class Logilight(PWM1): def __init__(self): PWM1.__init__(self) self._name = "logilight" - self.params = [ - PARAM_ID, - ('u', 'unit'), - ('a', 'command'), - ] + self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND] + self._commands = {"on": "1", "learn": "1", "off": "0"} def _encode_unit(self, unit): res = "" @@ -317,7 +359,7 @@ class Logilight(PWM1): def encode(self, params, timebase=None, repetitions=None): symbols = "" symbols += "{:020b}".format(int(params["id"])) - symbols += "1" if params["command"] in ["on", "learn"] else "0" + symbols += self._encode_command(params["command"]) symbols += self._encode_unit(params["unit"]) if (params["command"] == "learn"): repetitions = 10 @@ -329,7 +371,7 @@ class Logilight(PWM1): params = { "id": int(symbols[:20], 2), "unit": self._decode_unit(symbols[21:24]), - "command": "on" if symbols[20] == "1" else "off" + "command": self._decode_command(symbols[20]) } return self._name, params, tb, rep @@ -337,34 +379,21 @@ class Emylo(PWM1): def __init__(self): PWM1.__init__(self) self._name = "emylo" - self.params = [ - PARAM_ID, - ('k', 'key'), - ] - self.__keys = {'A': '0001', 'B': '0010', 'C': '0100', 'D': '1000'} + self.params = [PARAM_ID, PARAM_COMMAND] + self._commands = {'A': '0001', 'B': '0010', 'C': '0100', 'D': '1000'} def encode(self, params, timebase=None, repetitions=None): symbols = "" symbols += "{:020b}".format(int(params["id"])) - if not params["key"] in self.__keys: - return - symbols += self.__keys[params["key"]] + symbols += self._encode_command(params["command"]) return self._build_frame(symbols, timebase, repetitions) def decode(self, pulsetrain): symbols, tb, rep = self._decode_symbols(pulsetrain[:-2]) if symbols: - key = None - for k in self.__keys: - if self.__keys[k] == symbols[-4:]: - key = k - break - if key is None: - return - params = { "id": int(symbols[:20], 2), - "key": key, + "command": self._decode_command(symbols[-4:]) } return self._name, params, tb, rep @@ -380,11 +409,7 @@ class FS20(RcProtocol): } self._header = [2, 2] * 12 + [3, 3] self._footer = [1, 100] - self.params = [ - PARAM_ID, - ('u', 'unit'), - ('a', 'command') - ] + self.params = [PARAM_ID,PARAM_UNIT, PARAM_COMMAND] RcProtocol.__init__(self) def __encode_byte(self, b): @@ -426,7 +451,6 @@ class Voltcraft(RcProtocol): Pulse in middle of a symbol: 0, end of symbol: 1 Used by Voltcraft RC30 ''' - __commands = {0: "off", 1: "alloff", 2: "on", 3: "allon", 5: "dimup", 7: "dimdown"} def __init__(self): self._name = "voltcraft" self._timebase = 600 @@ -438,28 +462,19 @@ class Voltcraft(RcProtocol): } self._header = [1] self._footer = [132] - self.params = [ - PARAM_ID, - ('u', 'unit'), - ('a', 'command') - ] + self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND] + self._commands = {"off": "000", "alloff": "100", "on": "010", "allon": "110", "dimup": "101", "dimdown": "111"} RcProtocol.__init__(self) def encode(self, params, timebase=None, repetitions=None): - cmd = None - for k in self.__commands: - if self.__commands[k] == params["command"]: - cmd = k - break - - if not (cmd in [0, 2]): - unit = 3 - else: + if params["command"] in ["on", "off"]: unit = int(params["unit"])-1 + else: + unit = 3 symbols = "{:012b}".format(int(params["id"]))[::-1] symbols += "{:02b}".format(unit)[::-1] - symbols += "{:03b}".format(cmd)[::-1] + symbols += self._encode_command(params["command"]) symbols += "0" symbols += "1" if (symbols[12] == "1") ^ (symbols[14] == "1") ^ (symbols[16] == "1") else "0" symbols += "1" if (symbols[13] == "1") ^ (symbols[15] == "1") ^ (symbols[17] == "1") else "0" @@ -471,7 +486,7 @@ class Voltcraft(RcProtocol): params = { "id": int(symbols[0:12][::-1], 2), "unit": int(symbols[12:14][::-1], 2) + 1, - "command": self.__commands[int(symbols[14:17][::-1], 2)] + "command": self._decode_command(symbols[14:17]) } return self._name, params, tb, rep @@ -492,31 +507,9 @@ class PWM2(RcProtocol): '0': [2, 1], } self._footer = [1, 11] - self.params = [ - ('c', 'code') - ] + self.params = [PARAM_CODE] RcProtocol.__init__(self) - def encode(self, params, timebase=None, repetitions=None): - symbols = params["code"] - return self._build_frame(symbols, timebase, repetitions) - - def decode(self, pulsetrain): - symbols, tb, rep = self._decode_symbols(pulsetrain[:-2]) - - if symbols: - x1 = symbols[2:8] - x2 = x1[5] + x1[4] + x1[3] + x1[2] + x1[1] + x1[0] - print(symbols, symbols[2:4], symbols[4:8], int(x1, 2), int(x2, 2)) - return - - if symbols: - return { - "protocol": self._name, - "code": code, - "timebase": tb, - }, rep - class PilotaCasa(PWM2): __codes = { '110001': (1, 1, 'on'), '111110': (1, 1, 'off'), @@ -536,12 +529,7 @@ class PilotaCasa(PWM2): def __init__(self): PWM2.__init__(self) self._name = "pilota" - self.params = [ - PARAM_ID, - ('g', 'group'), - ('u', 'unit'), - ('a', 'command') - ] + self.params = [PARAM_ID, PARAM_GROUP, PARAM_UNIT, PARAM_COMMAND] def encode(self, params, timebase=None, repetitions=None): symbols = '01' @@ -553,7 +541,6 @@ class PilotaCasa(PWM2): symbols += u symbols += "{:016b}".format(int(params["id"]))[::-1] symbols += "11111111" - print(symbols) return self._build_frame(symbols, timebase, repetitions) def decode(self, pulsetrain): @@ -601,6 +588,7 @@ class PCPIR(RcProtocol): #pilota casa PIR sensor protocols = [ Tristate(), ITTristate(), + Brennenstuhl(), Intertechno(), Hama(), Logilight(), @@ -725,7 +713,10 @@ class RcTransceiver(threading.Thread): res = [] succ = False for p in protocols: - dec = p.decode(train) + try: + dec = p.decode(train) + except: + continue if dec: print(dec) succ = True @@ -737,8 +728,8 @@ class RcTransceiver(threading.Thread): try: txdata, tb = proto.encode(params, timebase, repeats) self.__rfmtrx.send(txdata, tb) - except: - print("Encode error") + except Exception as e: + print("Encode error: " + e.message) def run(self): while True: diff --git a/apps/rcpulsegw.py b/apps/rcpulsegw.py old mode 100644 new mode 100755