refactoring

This commit is contained in:
S. Seegel 2020-12-29 00:18:03 +01:00
parent 01ddd7e3cc
commit d520104288
3 changed files with 119 additions and 67 deletions

69
apps/apiserver.py Normal file
View file

@ -0,0 +1,69 @@
import threading
import socket
import json
clients = []
lock = threading.Lock()
class clientthread(threading.Thread):
def __init__(self, socket, cb):
self.__socket = socket
self.__cb = cb
threading.Thread.__init__(self)
def send(self, data):
self.__socket.sendall(data)
def run(self):
buf = ""
while True:
chunk = self.__socket.recv(32)
if len(chunk) == 0:
del self.__socket
break
lock.acquire()
try:
chunk = chunk.decode()
buf += chunk
while "\n" in buf:
line = buf[:buf.find("\n")]
buf = buf[buf.find("\n") + 1:]
d = json.loads(line)
if self.__cb:
self.__cb(d)
#rctrx.send(d["protocol"], d["params"])
except:
pass
lock.release()
lock.acquire()
clients.remove(self)
lock.release()
class ApiServer(threading.Thread):
def __init__(self, port, cb = None):
self.__srvsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__srvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.__srvsock.bind(('', port))
self.__srvsock.listen(5)
self.__cb = cb
threading.Thread.__init__(self)
self.daemon = True
self.start()
def run(self):
while True:
(client, address) = self.__srvsock.accept()
ct = clientthread(client, self.__cb)
ct.daemon = True
ct.start()
lock.acquire()
clients.append(ct)
lock.release()
def send(self, dict):
data = (json.dumps(dict) + "\n").encode()
lock.acquire()
for client in clients:
client.send(data)
lock.release()

View file

@ -9,6 +9,8 @@ import threading
import math import math
import json import json
from datetime import datetime from datetime import datetime
import apiserver
import os
try: try:
#python2.7 #python2.7
@ -24,7 +26,8 @@ except ImportError:
lock = threading.Lock() lock = threading.Lock()
with open("lacrossegw.conf") as jfile: script_dir = os.path.dirname(__file__)
with open(script_dir + "/lacrossegw.conf") as jfile:
config = json.load(jfile) config = json.load(jfile)
if raspyrfm.raspyrfm_test(2, raspyrfm.RFM69): if raspyrfm.raspyrfm_test(2, raspyrfm.RFM69):
@ -112,7 +115,7 @@ def writeInflux(payload):
"fields": { "fields": {
"T": T "T": T
}, },
"tags": {"sensor": payload["ID"] if not ("room" in payload) else payload["room"]} "tags": {"sensor": payload["id"] if not ("room" in payload) else payload["room"]}
} }
if ("RH" in payload): if ("RH" in payload):
@ -182,7 +185,7 @@ class MyHttpRequestHandler(Handler):
else: else:
sensor = {} sensor = {}
sensor["room"] = csens["name"] sensor["room"] = csens["name"]
sensor["ID"] = id sensor["id"] = id
resp["sensors"].append(sensor) resp["sensors"].append(sensor)
for id in cache: for id in cache:
@ -234,6 +237,8 @@ server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True server_thread.daemon = True
server_thread.start() server_thread.start()
apisrv = apiserver.ApiServer(1990)
print("Waiting for sensors...") print("Waiting for sensors...")
while 1: while 1:
@ -243,8 +248,8 @@ while 1:
sensorObj = sensors.rawsensor.CreateSensor(rxObj) sensorObj = sensors.rawsensor.CreateSensor(rxObj)
sensorData = sensorObj.GetData() sensorData = sensorObj.GetData()
payload = {} payload = {}
ID = sensorData["ID"] id = sensorData["ID"]
payload["ID"] = ID payload["id"] = id
T = sensorData["T"][0] T = sensorData["T"][0]
payload["T"] = T payload["T"] = T
except: except:
@ -256,7 +261,7 @@ while 1:
payload["init"] = sensorData["init"] payload["init"] = sensorData["init"]
lock.acquire() lock.acquire()
for csens in config["sensors"]: for csens in config["sensors"]:
if sensorData['ID'] == csens["id"]: if id == csens["id"]:
payload["room"] = csens["name"] payload["room"] = csens["name"]
break break
@ -282,26 +287,35 @@ while 1:
v = math.log10(DD/6.1078) v = math.log10(DD/6.1078)
payload["DEW60"] = round(b*v/(a-v), 1) payload["DEW60"] = round(b*v/(a-v), 1)
if not ID in cache: apipayl = {
cache[ID] = {} "decode": [{
cache[ID]["count"] = 1 "protocol": "lacrosse",
cache[ID]["payload"] = payload "params": payload,
cache[ID]["payload"]["tMin"] = T "class": "weather"
cache[ID]["payload"]["tMax"] = T }]
}
apisrv.send(apipayl)
if not id in cache:
cache[id] = {}
cache[id]["count"] = 1
cache[id]["payload"] = payload
cache[id]["payload"]["tMin"] = T
cache[id]["payload"]["tMax"] = T
else: else:
payload["tMin"] = cache[ID]["payload"]["tMin"] payload["tMin"] = cache[id]["payload"]["tMin"]
payload["tMax"] = cache[ID]["payload"]["tMax"] payload["tMax"] = cache[id]["payload"]["tMax"]
if payload["tMin"] > T: if payload["tMin"] > T:
payload["tMin"] = T payload["tMin"] = T
if payload["tMax"] < T: if payload["tMax"] < T:
payload["tMax"] = T payload["tMax"] = T
cache[ID]["payload"] = payload cache[id]["payload"] = payload
cache[ID]["ts"] = datetime.now() cache[id]["ts"] = datetime.now()
cache[ID]["count"] += 1 cache[id]["count"] += 1
line = u" ID: {:2} ".format(ID); line = u" id: {:2} ".format(id)
line += u'room {:12} '.format(payload["room"][:12] if ("room" in payload) else "---") line += u'room {:12} '.format(payload["room"][:12] if ("room" in payload) else "---")
line += u' T: {:5} \u00b0C '.format(payload["T"]) line += u' T: {:5} \u00b0C '.format(payload["T"])
if "RH" in payload: if "RH" in payload:
@ -310,7 +324,7 @@ while 1:
line += "init: " + ("true " if payload["init"] else "false ") line += "init: " + ("true " if payload["init"] else "false ")
print('------------------------------------------------------------------------------') print('------------------------------------------------------------------------------')
print(line) print(line).encode("utf-8")
lock.release() lock.release()
try: try:
@ -321,6 +335,6 @@ while 1:
try: try:
if mqttClient: if mqttClient:
mqttClient.publish('home/lacrosse/'+ payload['ID'], json.dumps(payload)) mqttClient.publish('home/lacrosse/'+ payload['id'], json.dumps(payload))
except: except:
pass pass

View file

@ -1,25 +1,18 @@
#!/usr/bin/env python #!/usr/bin/env python
import socket
import threading
from raspyrfm import * from raspyrfm import *
import rcprotocols import rcprotocols
import json import json
from argparse import ArgumentParser from argparse import ArgumentParser
import apiserver
import time
parser = ArgumentParser() parser = ArgumentParser()
parser.add_argument("-m", "--module", type=int, metavar="1-4", help=u"RaspyRFM module 1-4", default=1) parser.add_argument("-m", "--module", type=int, metavar="1-4", help=u"RaspyRFM module 1-4", default=1)
parser.add_argument("-f", "--frequency", type=float, help=u"frequency in MHz", default=433.92) parser.add_argument("-f", "--frequency", type=float, help=u"frequency in MHz", default=433.92)
args = parser.parse_args() args = parser.parse_args()
srvsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) def rccb(dec, train):
srvsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srvsock.bind(('', 1989))
srvsock.listen(5)
clients = []
def rxcb(dec, train):
payload = None payload = None
if dec is None: if dec is None:
payload = {"raw": train} payload = {"raw": train}
@ -29,50 +22,26 @@ def rxcb(dec, train):
if payload is not None: if payload is not None:
print("RX", payload) print("RX", payload)
s = json.dumps(payload) + "\n" try:
for client in clients: apisrv.send(payload)
client.send(s) except:
pass
if not raspyrfm_test(args.module, RFM69): if not raspyrfm_test(args.module, RFM69):
print("Error! RaspyRFM not found") print("Error! RaspyRFM not found")
exit() exit()
rctrx = rcprotocols.RcTransceiver(args.module, args.frequency, rxcb) rctrx = rcprotocols.RcTransceiver(args.module, args.frequency, rccb)
lock = threading.Lock()
class clientthread(threading.Thread): def apicb(data):
def __init__(self, socket): print("TX", data)
self.__socket = socket
threading.Thread.__init__(self)
def send(self, s):
self.__socket.sendall(s.encode())
def run(self):
buf = ""
while True:
chunk = self.__socket.recv(32).decode()
if len(chunk) == 0:
del self.__socket
break
lock.acquire()
try: try:
buf += chunk rctrx.send(data["protocol"], data["params"])
while "\n" in buf:
line = buf[:buf.find("\n")]
buf = buf[buf.find("\n") + 1:]
d = json.loads(line)
print("TX", d)
rctrx.send(d["protocol"], d["params"])
except: except:
pass pass
lock.release()
clients.remove(self) apisrv = apiserver.ApiServer(1989, apicb)
while True: while True:
(client, address) = srvsock.accept() time.sleep(1)
ct = clientthread(client)
ct.daemon = True
ct.start()
clients.append(ct)