From d520104288ef51619dfeee96af346440e970b0c2 Mon Sep 17 00:00:00 2001 From: "S. Seegel" Date: Tue, 29 Dec 2020 00:18:03 +0100 Subject: [PATCH] refactoring --- apps/apiserver.py | 69 ++++++++++++++++++++++++++++++++++++++++++++++ apps/lacrossegw.py | 54 ++++++++++++++++++++++-------------- apps/rcpulsegw.py | 63 +++++++++++------------------------------- 3 files changed, 119 insertions(+), 67 deletions(-) create mode 100644 apps/apiserver.py diff --git a/apps/apiserver.py b/apps/apiserver.py new file mode 100644 index 0000000..608ef18 --- /dev/null +++ b/apps/apiserver.py @@ -0,0 +1,69 @@ +import threading +import socket +import json + +clients = [] +lock = threading.Lock() + +class clientthread(threading.Thread): + def __init__(self, socket, cb): + self.__socket = socket + self.__cb = cb + threading.Thread.__init__(self) + + def send(self, data): + self.__socket.sendall(data) + + def run(self): + buf = "" + while True: + chunk = self.__socket.recv(32) + if len(chunk) == 0: + del self.__socket + break + lock.acquire() + try: + chunk = chunk.decode() + buf += chunk + while "\n" in buf: + line = buf[:buf.find("\n")] + buf = buf[buf.find("\n") + 1:] + d = json.loads(line) + if self.__cb: + self.__cb(d) + #rctrx.send(d["protocol"], d["params"]) + except: + pass + lock.release() + + lock.acquire() + clients.remove(self) + lock.release() + +class ApiServer(threading.Thread): + def __init__(self, port, cb = None): + self.__srvsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.__srvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__srvsock.bind(('', port)) + self.__srvsock.listen(5) + self.__cb = cb + threading.Thread.__init__(self) + self.daemon = True + self.start() + + def run(self): + while True: + (client, address) = self.__srvsock.accept() + ct = clientthread(client, self.__cb) + ct.daemon = True + ct.start() + lock.acquire() + clients.append(ct) + lock.release() + + def send(self, dict): + data = (json.dumps(dict) + "\n").encode() + lock.acquire() + for client in clients: + client.send(data) + lock.release() \ No newline at end of file diff --git a/apps/lacrossegw.py b/apps/lacrossegw.py index baeb0fc..75c1f8f 100755 --- a/apps/lacrossegw.py +++ b/apps/lacrossegw.py @@ -9,6 +9,8 @@ import threading import math import json from datetime import datetime +import apiserver +import os try: #python2.7 @@ -24,7 +26,8 @@ except ImportError: lock = threading.Lock() -with open("lacrossegw.conf") as jfile: +script_dir = os.path.dirname(__file__) +with open(script_dir + "/lacrossegw.conf") as jfile: config = json.load(jfile) if raspyrfm.raspyrfm_test(2, raspyrfm.RFM69): @@ -112,7 +115,7 @@ def writeInflux(payload): "fields": { "T": T }, - "tags": {"sensor": payload["ID"] if not ("room" in payload) else payload["room"]} + "tags": {"sensor": payload["id"] if not ("room" in payload) else payload["room"]} } if ("RH" in payload): @@ -182,7 +185,7 @@ class MyHttpRequestHandler(Handler): else: sensor = {} sensor["room"] = csens["name"] - sensor["ID"] = id + sensor["id"] = id resp["sensors"].append(sensor) for id in cache: @@ -234,6 +237,8 @@ server_thread = threading.Thread(target=server.serve_forever) server_thread.daemon = True server_thread.start() +apisrv = apiserver.ApiServer(1990) + print("Waiting for sensors...") while 1: @@ -243,8 +248,8 @@ while 1: sensorObj = sensors.rawsensor.CreateSensor(rxObj) sensorData = sensorObj.GetData() payload = {} - ID = sensorData["ID"] - payload["ID"] = ID + id = sensorData["ID"] + payload["id"] = id T = sensorData["T"][0] payload["T"] = T except: @@ -256,7 +261,7 @@ while 1: payload["init"] = sensorData["init"] lock.acquire() for csens in config["sensors"]: - if sensorData['ID'] == csens["id"]: + if id == csens["id"]: payload["room"] = csens["name"] break @@ -282,26 +287,35 @@ while 1: v = math.log10(DD/6.1078) payload["DEW60"] = round(b*v/(a-v), 1) - if not ID in cache: - cache[ID] = {} - cache[ID]["count"] = 1 - cache[ID]["payload"] = payload - cache[ID]["payload"]["tMin"] = T - cache[ID]["payload"]["tMax"] = T + apipayl = { + "decode": [{ + "protocol": "lacrosse", + "params": payload, + "class": "weather" + }] + } + apisrv.send(apipayl) + + if not id in cache: + cache[id] = {} + cache[id]["count"] = 1 + cache[id]["payload"] = payload + cache[id]["payload"]["tMin"] = T + cache[id]["payload"]["tMax"] = T else: - payload["tMin"] = cache[ID]["payload"]["tMin"] - payload["tMax"] = cache[ID]["payload"]["tMax"] + payload["tMin"] = cache[id]["payload"]["tMin"] + payload["tMax"] = cache[id]["payload"]["tMax"] if payload["tMin"] > T: payload["tMin"] = T if payload["tMax"] < T: payload["tMax"] = T - cache[ID]["payload"] = payload + cache[id]["payload"] = payload - cache[ID]["ts"] = datetime.now() - cache[ID]["count"] += 1 + cache[id]["ts"] = datetime.now() + cache[id]["count"] += 1 - line = u" ID: {:2} ".format(ID); + line = u" id: {:2} ".format(id) line += u'room {:12} '.format(payload["room"][:12] if ("room" in payload) else "---") line += u' T: {:5} \u00b0C '.format(payload["T"]) if "RH" in payload: @@ -310,7 +324,7 @@ while 1: line += "init: " + ("true " if payload["init"] else "false ") print('------------------------------------------------------------------------------') - print(line) + print(line).encode("utf-8") lock.release() try: @@ -321,6 +335,6 @@ while 1: try: if mqttClient: - mqttClient.publish('home/lacrosse/'+ payload['ID'], json.dumps(payload)) + mqttClient.publish('home/lacrosse/'+ payload['id'], json.dumps(payload)) except: pass diff --git a/apps/rcpulsegw.py b/apps/rcpulsegw.py index d79d1db..8ca3bbe 100755 --- a/apps/rcpulsegw.py +++ b/apps/rcpulsegw.py @@ -1,25 +1,18 @@ #!/usr/bin/env python -import socket -import threading from raspyrfm import * import rcprotocols import json from argparse import ArgumentParser +import apiserver +import time parser = ArgumentParser() parser.add_argument("-m", "--module", type=int, metavar="1-4", help=u"RaspyRFM module 1-4", default=1) parser.add_argument("-f", "--frequency", type=float, help=u"frequency in MHz", default=433.92) args = parser.parse_args() -srvsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -srvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) -srvsock.bind(('', 1989)) -srvsock.listen(5) - -clients = [] - -def rxcb(dec, train): +def rccb(dec, train): payload = None if dec is None: payload = {"raw": train} @@ -29,50 +22,26 @@ def rxcb(dec, train): if payload is not None: print("RX", payload) - s = json.dumps(payload) + "\n" - for client in clients: - client.send(s) + try: + apisrv.send(payload) + except: + pass if not raspyrfm_test(args.module, RFM69): print("Error! RaspyRFM not found") exit() -rctrx = rcprotocols.RcTransceiver(args.module, args.frequency, rxcb) -lock = threading.Lock() +rctrx = rcprotocols.RcTransceiver(args.module, args.frequency, rccb) -class clientthread(threading.Thread): - def __init__(self, socket): - self.__socket = socket - threading.Thread.__init__(self) +def apicb(data): + print("TX", data) + try: + rctrx.send(data["protocol"], data["params"]) + except: + pass - def send(self, s): - self.__socket.sendall(s.encode()) - - def run(self): - buf = "" - while True: - chunk = self.__socket.recv(32).decode() - if len(chunk) == 0: - del self.__socket - break - lock.acquire() - try: - buf += chunk - while "\n" in buf: - line = buf[:buf.find("\n")] - buf = buf[buf.find("\n") + 1:] - d = json.loads(line) - print("TX", d) - rctrx.send(d["protocol"], d["params"]) - except: - pass - lock.release() - clients.remove(self) +apisrv = apiserver.ApiServer(1989, apicb) while True: - (client, address) = srvsock.accept() - ct = clientthread(client) - ct.daemon = True - ct.start() - clients.append(ct) + time.sleep(1)