add Brennenstuhl RCS1000
refactoring
This commit is contained in:
parent
bc095e6ab7
commit
9812b0ff7c
2 changed files with 89 additions and 98 deletions
|
@ -5,6 +5,12 @@ import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
PARAM_ID = ('i', 'id')
|
PARAM_ID = ('i', 'id')
|
||||||
|
PARAM_HOUSE = ('o', 'house')
|
||||||
|
PARAM_GROUP = ('g', 'group')
|
||||||
|
PARAM_UNIT = ('u', 'unit')
|
||||||
|
PARAM_COMMAND = ('a', 'command')
|
||||||
|
PARAM_CODE = ('c', 'code')
|
||||||
|
PARAM_DIPS = ('d', 'dips')
|
||||||
|
|
||||||
class RcProtocol:
|
class RcProtocol:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
@ -105,6 +111,18 @@ class RcProtocol:
|
||||||
|
|
||||||
return None, None, None
|
return None, None, None
|
||||||
|
|
||||||
|
def _encode_command(self, command):
|
||||||
|
if command in self._commands:
|
||||||
|
return self._commands[command]
|
||||||
|
else:
|
||||||
|
raise Exception("Invalid command")
|
||||||
|
|
||||||
|
def _decode_command(self, symbols):
|
||||||
|
for k in self._commands:
|
||||||
|
if self._commands[k] == symbols:
|
||||||
|
return k
|
||||||
|
raise Exception("Unknown command")
|
||||||
|
|
||||||
def _build_frame(self, symbols, timebase=None, repetitions=None):
|
def _build_frame(self, symbols, timebase=None, repetitions=None):
|
||||||
self._reset()
|
self._reset()
|
||||||
|
|
||||||
|
@ -159,9 +177,7 @@ class Tristate(TristateBase): #old intertechno
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._name = "tristate"
|
self._name = "tristate"
|
||||||
TristateBase.__init__(self)
|
TristateBase.__init__(self)
|
||||||
self.params = [
|
self.params = [PARAM_CODE]
|
||||||
('c', 'code'),
|
|
||||||
]
|
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
return self._build_frame(params["code"], timebase, repetitions)
|
return self._build_frame(params["code"], timebase, repetitions)
|
||||||
|
@ -175,12 +191,8 @@ class ITTristate(TristateBase): #old intertechno
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._name = "ittristate"
|
self._name = "ittristate"
|
||||||
TristateBase.__init__(self)
|
TristateBase.__init__(self)
|
||||||
self.params = [
|
self.params = [PARAM_HOUSE, PARAM_GROUP, PARAM_UNIT, PARAM_COMMAND]
|
||||||
('o', 'house'), #A-P
|
self._commands = {"on": "FF", "off": "F0"}
|
||||||
('g', 'group'), #1-4
|
|
||||||
('u', 'unit'), #1-4
|
|
||||||
('a', 'command'), #on|off
|
|
||||||
]
|
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
symbols = ""
|
symbols = ""
|
||||||
|
@ -188,7 +200,7 @@ class ITTristate(TristateBase): #old intertechno
|
||||||
symbols += self._encode_int(int(params["unit"]) - 1, 2)
|
symbols += self._encode_int(int(params["unit"]) - 1, 2)
|
||||||
symbols += self._encode_int(int(params["group"]) - 1, 2)
|
symbols += self._encode_int(int(params["group"]) - 1, 2)
|
||||||
symbols += "0f"
|
symbols += "0f"
|
||||||
symbols += "ff" if params["command"] == "on" else "f0"
|
symbols += self._encode_command(params["command"])
|
||||||
return self._build_frame(symbols, timebase, repetitions)
|
return self._build_frame(symbols, timebase, repetitions)
|
||||||
|
|
||||||
def decode(self, pulsetrain):
|
def decode(self, pulsetrain):
|
||||||
|
@ -198,7 +210,43 @@ class ITTristate(TristateBase): #old intertechno
|
||||||
"house": chr(self._decode_int(symbols[:4]) + ord('A')),
|
"house": chr(self._decode_int(symbols[:4]) + ord('A')),
|
||||||
"unit": self._decode_int(symbols[4:6]) + 1,
|
"unit": self._decode_int(symbols[4:6]) + 1,
|
||||||
"group": self._decode_int(symbols[6:8]) + 1,
|
"group": self._decode_int(symbols[6:8]) + 1,
|
||||||
"command": "on" if symbols[10:12].upper() == "FF" else "off",
|
"command": self._decode_command(symbols[10:12].upper()),
|
||||||
|
}
|
||||||
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
class Brennenstuhl(TristateBase): #old intertechno
|
||||||
|
def __init__(self):
|
||||||
|
self._name = "brennenstuhl"
|
||||||
|
TristateBase.__init__(self)
|
||||||
|
self.params = [PARAM_DIPS, PARAM_UNIT, PARAM_COMMAND]
|
||||||
|
self._commands = {"on": "0F", "off": "F0"}
|
||||||
|
|
||||||
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
|
symbols = ""
|
||||||
|
for c in params["dips"]:
|
||||||
|
symbols += '0' if c == '1' else 'F'
|
||||||
|
for i in range(4):
|
||||||
|
symbols += '0' if (int(params["unit"]) - 1) == i else 'F'
|
||||||
|
symbols += "F"
|
||||||
|
symbols += self._encode_command(params["command"])
|
||||||
|
return self._build_frame(symbols, timebase, repetitions)
|
||||||
|
|
||||||
|
def decode(self, pulsetrain):
|
||||||
|
symbols, tb, rep = self._decode_symbols(pulsetrain[0:-2])
|
||||||
|
if symbols:
|
||||||
|
dips = ""
|
||||||
|
for s in symbols[0:5]:
|
||||||
|
dips += "1" if s == '0' else "0"
|
||||||
|
unit = 0
|
||||||
|
for u in symbols[5:9]:
|
||||||
|
unit += 1
|
||||||
|
if u == '0':
|
||||||
|
break
|
||||||
|
|
||||||
|
params = {
|
||||||
|
"dips": dips,
|
||||||
|
"unit": unit,
|
||||||
|
"command": self._decode_command(symbols[10:12].upper()),
|
||||||
}
|
}
|
||||||
return self._name, params, tb, rep
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
@ -226,11 +274,8 @@ class Intertechno(PPM1):
|
||||||
PPM1.__init__(self)
|
PPM1.__init__(self)
|
||||||
self._name = "intertechno"
|
self._name = "intertechno"
|
||||||
self._timebase = 275
|
self._timebase = 275
|
||||||
self.params = [
|
self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
self._commands = {"on": "1", "off": "0"}
|
||||||
('u', 'unit'),
|
|
||||||
('a', 'command'),
|
|
||||||
]
|
|
||||||
|
|
||||||
def _encode_unit(self, unit):
|
def _encode_unit(self, unit):
|
||||||
return "{:04b}".format(int(unit) - 1)
|
return "{:04b}".format(int(unit) - 1)
|
||||||
|
@ -242,7 +287,7 @@ class Intertechno(PPM1):
|
||||||
symbols = ""
|
symbols = ""
|
||||||
symbols += "{:026b}".format(int(params["id"]))
|
symbols += "{:026b}".format(int(params["id"]))
|
||||||
symbols += "0" #group
|
symbols += "0" #group
|
||||||
symbols += "1" if params["command"] == "on" else "0"
|
symbols += self._encode_command(params["command"])
|
||||||
symbols += self._encode_unit(params["unit"])
|
symbols += self._encode_unit(params["unit"])
|
||||||
return self._build_frame(symbols, timebase, repetitions)
|
return self._build_frame(symbols, timebase, repetitions)
|
||||||
|
|
||||||
|
@ -252,7 +297,7 @@ class Intertechno(PPM1):
|
||||||
params = {
|
params = {
|
||||||
"id": int(symbols[:26], 2),
|
"id": int(symbols[:26], 2),
|
||||||
"unit": self._decode_unit(symbols[28:32]),
|
"unit": self._decode_unit(symbols[28:32]),
|
||||||
"command": "on" if symbols[27] == "1" else "off"
|
"command": self._decode_command(symbols[27])
|
||||||
}
|
}
|
||||||
return self._name, params, tb, rep
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
@ -291,11 +336,8 @@ class Logilight(PWM1):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
PWM1.__init__(self)
|
PWM1.__init__(self)
|
||||||
self._name = "logilight"
|
self._name = "logilight"
|
||||||
self.params = [
|
self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
self._commands = {"on": "1", "learn": "1", "off": "0"}
|
||||||
('u', 'unit'),
|
|
||||||
('a', 'command'),
|
|
||||||
]
|
|
||||||
|
|
||||||
def _encode_unit(self, unit):
|
def _encode_unit(self, unit):
|
||||||
res = ""
|
res = ""
|
||||||
|
@ -317,7 +359,7 @@ class Logilight(PWM1):
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
symbols = ""
|
symbols = ""
|
||||||
symbols += "{:020b}".format(int(params["id"]))
|
symbols += "{:020b}".format(int(params["id"]))
|
||||||
symbols += "1" if params["command"] in ["on", "learn"] else "0"
|
symbols += self._encode_command(params["command"])
|
||||||
symbols += self._encode_unit(params["unit"])
|
symbols += self._encode_unit(params["unit"])
|
||||||
if (params["command"] == "learn"):
|
if (params["command"] == "learn"):
|
||||||
repetitions = 10
|
repetitions = 10
|
||||||
|
@ -329,7 +371,7 @@ class Logilight(PWM1):
|
||||||
params = {
|
params = {
|
||||||
"id": int(symbols[:20], 2),
|
"id": int(symbols[:20], 2),
|
||||||
"unit": self._decode_unit(symbols[21:24]),
|
"unit": self._decode_unit(symbols[21:24]),
|
||||||
"command": "on" if symbols[20] == "1" else "off"
|
"command": self._decode_command(symbols[20])
|
||||||
}
|
}
|
||||||
return self._name, params, tb, rep
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
@ -337,34 +379,21 @@ class Emylo(PWM1):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
PWM1.__init__(self)
|
PWM1.__init__(self)
|
||||||
self._name = "emylo"
|
self._name = "emylo"
|
||||||
self.params = [
|
self.params = [PARAM_ID, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
self._commands = {'A': '0001', 'B': '0010', 'C': '0100', 'D': '1000'}
|
||||||
('k', 'key'),
|
|
||||||
]
|
|
||||||
self.__keys = {'A': '0001', 'B': '0010', 'C': '0100', 'D': '1000'}
|
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
symbols = ""
|
symbols = ""
|
||||||
symbols += "{:020b}".format(int(params["id"]))
|
symbols += "{:020b}".format(int(params["id"]))
|
||||||
if not params["key"] in self.__keys:
|
symbols += self._encode_command(params["command"])
|
||||||
return
|
|
||||||
symbols += self.__keys[params["key"]]
|
|
||||||
return self._build_frame(symbols, timebase, repetitions)
|
return self._build_frame(symbols, timebase, repetitions)
|
||||||
|
|
||||||
def decode(self, pulsetrain):
|
def decode(self, pulsetrain):
|
||||||
symbols, tb, rep = self._decode_symbols(pulsetrain[:-2])
|
symbols, tb, rep = self._decode_symbols(pulsetrain[:-2])
|
||||||
if symbols:
|
if symbols:
|
||||||
key = None
|
|
||||||
for k in self.__keys:
|
|
||||||
if self.__keys[k] == symbols[-4:]:
|
|
||||||
key = k
|
|
||||||
break
|
|
||||||
if key is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"id": int(symbols[:20], 2),
|
"id": int(symbols[:20], 2),
|
||||||
"key": key,
|
"command": self._decode_command(symbols[-4:])
|
||||||
}
|
}
|
||||||
return self._name, params, tb, rep
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
@ -380,11 +409,7 @@ class FS20(RcProtocol):
|
||||||
}
|
}
|
||||||
self._header = [2, 2] * 12 + [3, 3]
|
self._header = [2, 2] * 12 + [3, 3]
|
||||||
self._footer = [1, 100]
|
self._footer = [1, 100]
|
||||||
self.params = [
|
self.params = [PARAM_ID,PARAM_UNIT, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
|
||||||
('u', 'unit'),
|
|
||||||
('a', 'command')
|
|
||||||
]
|
|
||||||
RcProtocol.__init__(self)
|
RcProtocol.__init__(self)
|
||||||
|
|
||||||
def __encode_byte(self, b):
|
def __encode_byte(self, b):
|
||||||
|
@ -426,7 +451,6 @@ class Voltcraft(RcProtocol):
|
||||||
Pulse in middle of a symbol: 0, end of symbol: 1
|
Pulse in middle of a symbol: 0, end of symbol: 1
|
||||||
Used by Voltcraft RC30
|
Used by Voltcraft RC30
|
||||||
'''
|
'''
|
||||||
__commands = {0: "off", 1: "alloff", 2: "on", 3: "allon", 5: "dimup", 7: "dimdown"}
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._name = "voltcraft"
|
self._name = "voltcraft"
|
||||||
self._timebase = 600
|
self._timebase = 600
|
||||||
|
@ -438,28 +462,19 @@ class Voltcraft(RcProtocol):
|
||||||
}
|
}
|
||||||
self._header = [1]
|
self._header = [1]
|
||||||
self._footer = [132]
|
self._footer = [132]
|
||||||
self.params = [
|
self.params = [PARAM_ID, PARAM_UNIT, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
self._commands = {"off": "000", "alloff": "100", "on": "010", "allon": "110", "dimup": "101", "dimdown": "111"}
|
||||||
('u', 'unit'),
|
|
||||||
('a', 'command')
|
|
||||||
]
|
|
||||||
RcProtocol.__init__(self)
|
RcProtocol.__init__(self)
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
cmd = None
|
if params["command"] in ["on", "off"]:
|
||||||
for k in self.__commands:
|
|
||||||
if self.__commands[k] == params["command"]:
|
|
||||||
cmd = k
|
|
||||||
break
|
|
||||||
|
|
||||||
if not (cmd in [0, 2]):
|
|
||||||
unit = 3
|
|
||||||
else:
|
|
||||||
unit = int(params["unit"])-1
|
unit = int(params["unit"])-1
|
||||||
|
else:
|
||||||
|
unit = 3
|
||||||
|
|
||||||
symbols = "{:012b}".format(int(params["id"]))[::-1]
|
symbols = "{:012b}".format(int(params["id"]))[::-1]
|
||||||
symbols += "{:02b}".format(unit)[::-1]
|
symbols += "{:02b}".format(unit)[::-1]
|
||||||
symbols += "{:03b}".format(cmd)[::-1]
|
symbols += self._encode_command(params["command"])
|
||||||
symbols += "0"
|
symbols += "0"
|
||||||
symbols += "1" if (symbols[12] == "1") ^ (symbols[14] == "1") ^ (symbols[16] == "1") else "0"
|
symbols += "1" if (symbols[12] == "1") ^ (symbols[14] == "1") ^ (symbols[16] == "1") else "0"
|
||||||
symbols += "1" if (symbols[13] == "1") ^ (symbols[15] == "1") ^ (symbols[17] == "1") else "0"
|
symbols += "1" if (symbols[13] == "1") ^ (symbols[15] == "1") ^ (symbols[17] == "1") else "0"
|
||||||
|
@ -471,7 +486,7 @@ class Voltcraft(RcProtocol):
|
||||||
params = {
|
params = {
|
||||||
"id": int(symbols[0:12][::-1], 2),
|
"id": int(symbols[0:12][::-1], 2),
|
||||||
"unit": int(symbols[12:14][::-1], 2) + 1,
|
"unit": int(symbols[12:14][::-1], 2) + 1,
|
||||||
"command": self.__commands[int(symbols[14:17][::-1], 2)]
|
"command": self._decode_command(symbols[14:17])
|
||||||
}
|
}
|
||||||
return self._name, params, tb, rep
|
return self._name, params, tb, rep
|
||||||
|
|
||||||
|
@ -492,31 +507,9 @@ class PWM2(RcProtocol):
|
||||||
'0': [2, 1],
|
'0': [2, 1],
|
||||||
}
|
}
|
||||||
self._footer = [1, 11]
|
self._footer = [1, 11]
|
||||||
self.params = [
|
self.params = [PARAM_CODE]
|
||||||
('c', 'code')
|
|
||||||
]
|
|
||||||
RcProtocol.__init__(self)
|
RcProtocol.__init__(self)
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
|
||||||
symbols = params["code"]
|
|
||||||
return self._build_frame(symbols, timebase, repetitions)
|
|
||||||
|
|
||||||
def decode(self, pulsetrain):
|
|
||||||
symbols, tb, rep = self._decode_symbols(pulsetrain[:-2])
|
|
||||||
|
|
||||||
if symbols:
|
|
||||||
x1 = symbols[2:8]
|
|
||||||
x2 = x1[5] + x1[4] + x1[3] + x1[2] + x1[1] + x1[0]
|
|
||||||
print(symbols, symbols[2:4], symbols[4:8], int(x1, 2), int(x2, 2))
|
|
||||||
return
|
|
||||||
|
|
||||||
if symbols:
|
|
||||||
return {
|
|
||||||
"protocol": self._name,
|
|
||||||
"code": code,
|
|
||||||
"timebase": tb,
|
|
||||||
}, rep
|
|
||||||
|
|
||||||
class PilotaCasa(PWM2):
|
class PilotaCasa(PWM2):
|
||||||
__codes = {
|
__codes = {
|
||||||
'110001': (1, 1, 'on'), '111110': (1, 1, 'off'),
|
'110001': (1, 1, 'on'), '111110': (1, 1, 'off'),
|
||||||
|
@ -536,12 +529,7 @@ class PilotaCasa(PWM2):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
PWM2.__init__(self)
|
PWM2.__init__(self)
|
||||||
self._name = "pilota"
|
self._name = "pilota"
|
||||||
self.params = [
|
self.params = [PARAM_ID, PARAM_GROUP, PARAM_UNIT, PARAM_COMMAND]
|
||||||
PARAM_ID,
|
|
||||||
('g', 'group'),
|
|
||||||
('u', 'unit'),
|
|
||||||
('a', 'command')
|
|
||||||
]
|
|
||||||
|
|
||||||
def encode(self, params, timebase=None, repetitions=None):
|
def encode(self, params, timebase=None, repetitions=None):
|
||||||
symbols = '01'
|
symbols = '01'
|
||||||
|
@ -553,7 +541,6 @@ class PilotaCasa(PWM2):
|
||||||
symbols += u
|
symbols += u
|
||||||
symbols += "{:016b}".format(int(params["id"]))[::-1]
|
symbols += "{:016b}".format(int(params["id"]))[::-1]
|
||||||
symbols += "11111111"
|
symbols += "11111111"
|
||||||
print(symbols)
|
|
||||||
return self._build_frame(symbols, timebase, repetitions)
|
return self._build_frame(symbols, timebase, repetitions)
|
||||||
|
|
||||||
def decode(self, pulsetrain):
|
def decode(self, pulsetrain):
|
||||||
|
@ -601,6 +588,7 @@ class PCPIR(RcProtocol): #pilota casa PIR sensor
|
||||||
protocols = [
|
protocols = [
|
||||||
Tristate(),
|
Tristate(),
|
||||||
ITTristate(),
|
ITTristate(),
|
||||||
|
Brennenstuhl(),
|
||||||
Intertechno(),
|
Intertechno(),
|
||||||
Hama(),
|
Hama(),
|
||||||
Logilight(),
|
Logilight(),
|
||||||
|
@ -725,7 +713,10 @@ class RcTransceiver(threading.Thread):
|
||||||
res = []
|
res = []
|
||||||
succ = False
|
succ = False
|
||||||
for p in protocols:
|
for p in protocols:
|
||||||
|
try:
|
||||||
dec = p.decode(train)
|
dec = p.decode(train)
|
||||||
|
except:
|
||||||
|
continue
|
||||||
if dec:
|
if dec:
|
||||||
print(dec)
|
print(dec)
|
||||||
succ = True
|
succ = True
|
||||||
|
@ -737,8 +728,8 @@ class RcTransceiver(threading.Thread):
|
||||||
try:
|
try:
|
||||||
txdata, tb = proto.encode(params, timebase, repeats)
|
txdata, tb = proto.encode(params, timebase, repeats)
|
||||||
self.__rfmtrx.send(txdata, tb)
|
self.__rfmtrx.send(txdata, tb)
|
||||||
except:
|
except Exception as e:
|
||||||
print("Encode error")
|
print("Encode error: " + e.message)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
while True:
|
while True:
|
||||||
|
|
0
apps/rcpulsegw.py
Normal file → Executable file
0
apps/rcpulsegw.py
Normal file → Executable file
Loading…
Reference in a new issue