From f634d234d688dd67ccf69d2bd2eecf8cb3313576 Mon Sep 17 00:00:00 2001 From: Jokob-sk Date: Sun, 20 Nov 2022 19:07:35 +1100 Subject: [PATCH] MQTT prep-work --- back/pialert.py | 147 ++++++++++++++++++++++++------------------------ 1 file changed, 72 insertions(+), 75 deletions(-) diff --git a/back/pialert.py b/back/pialert.py index 5bec7b2c..f812a22f 100755 --- a/back/pialert.py +++ b/back/pialert.py @@ -33,6 +33,7 @@ import json import requests from base64 import b64encode from paho.mqtt import client as mqtt_client +import threading # sys.stdout = open('pialert_new.log', 'w') @@ -147,15 +148,16 @@ except NameError: send_report_on_cycles = [1, "internet_IP", "update_vendors_silent"] cycle = "" network_scan_minutes = 5 +mqtt_thread_up = False # timestamps of last execution times time_now = datetime.datetime.now() -now_minus_one_day = time_now - timedelta(hours = 24) +now_minus_24h = time_now - timedelta(hours = 24) -last_network_scan = now_minus_one_day -last_internet_IP_scan = now_minus_one_day -last_run = now_minus_one_day -last_cleanup = now_minus_one_day +last_network_scan = now_minus_24h +last_internet_IP_scan = now_minus_24h +last_run = now_minus_24h +last_cleanup = now_minus_24h last_update_vendors = time_now - timedelta(days = 7) def main (): @@ -189,9 +191,16 @@ def main (): startTime = time_now startTime = startTime.replace (second=0, microsecond=0) - ## Upgrade DB if needed + # Upgrade DB if needed upgradeDB() + # Start MQTT thread if configured + # if reportMQTT and not mqtt_thread_up: + # print ('Establishing MQTT thread...') + # # start_mqtt_thread (connect_mqtt()) + # # connect_mqtt() + # mqtt_start() + # determine run/scan type based on passed time if last_internet_IP_scan + timedelta(minutes=3) < time_now: cycle = 'internet_IP' @@ -1567,11 +1576,6 @@ def email_reporting (): send_pushsafer (mail_text) else : print (' Skip PUSHSAFER...') - if reportMQTT : - print (' Establishing MQTT runtime...') - start_sending_mqtt (connect_mqtt(), json_final) - else : - print (' Skip MQTT...') else : print (' No changes to report...') @@ -1780,78 +1784,58 @@ def send_apprise (html): logResult (stdout, stderr) #------------------------------------------------------------------------------- -ConnectedMQTT = False #global variable for the state of the connection -client = mqtt_client.Client(mqttClientId) # Set Connecting Client ID -def connect_mqtt(): - if ConnectedMQTT == False: - def on_connect(client, userdata, flags, rc): - if rc == 0: - ConnectedMQTT = True - print(" Connected to MQTT Broker!") - else: - ConnectedMQTT = False - print(" Failed to connect, return code %d\n", rc) - - client.username_pw_set(mqttUser, mqttPassword) - client.on_connect = on_connect - client.connect(mqttBroker, mqttPort) - return client -#------------------------------------------------------------------------------- -def start_sending_mqtt(client, json_final): - client.loop_start() #start the loop +def mqtt_start(): + global mqtt_thread_up - index = 0 - for event in json_final['events']: - index = index + 1 - - deviceMAC = event[index][0] - deviceIP = event[index][1] - eventType = event[index][3] - deviceName = event[index][8] - deviceVendor = event[index][11] - # deviceNetworkNode = event[index][23] - + mqtt_thread_up = True - payload_str = ( - '{' - + f'"device_mac": {deviceMAC},' - + f'"device_ip": {deviceIP},' - + f'"event_type": {eventType},' - + f'"device_name": {deviceName},' - + f'"device_vendor": {deviceVendor},' - # + f'"device_network_node": "{deviceNetworkNode}"' - + "}" - ) + client = mqtt_client.Client(mqttClientId) # Set Connecting Client ID + client.username_pw_set(mqttUser, mqttPassword) + client.connect(mqttBroker, mqttPort) - client.publish( - # topic=f"system-sensors/sensor/{deviceName}/state", - topic=f"system-sensors/sensor/PiAlert/state", - payload=payload_str, + while True: + + time.sleep(15) + + # Online_Devices, Down_Devices, All_Devices, Archived_Devices + row = get_device_stats() + + client.publish("pialert/devices/online/number", + payload=row[0], qos=1, - retain=True - ) + retain=True) - client.publish( - # topic=f"system-sensors/sensor/{deviceName}/state", - topic=f"system-sensors/sensor/PiAlert2/state", - payload=payload_str, - retain=True - ) + client.publish("pialert/devices/number", + payload=row[2], + qos=1, + retain=True) + + print ('MQTT published messages') + + + client.loop() + + + +# #------------------------------------------------------------------------------- +# def start_mqtt_thread (client): +# # start a MQTT thread loop which will continuously report on devices to the broker +# # daemon=True - makes sure the thread dies with the process if interrupted +# global mqtt_thread_up + +# # flag to check if thread is running +# mqtt_thread_up = True + +# print(" Starting MQTT sending") +# # x = threading.Thread(target=start_sending_mqtt, args=(client,), daemon=True) +# start_sending_mqtt(client) + +# print(" Threading: Starting MQTT thread") + +# # x.start() - client.publish( - # topic=f"system-sensors/sensor/{deviceName}/state", - topic=f"system-sensors/sensor/PiAlert3/state", - payload='test message' - ) - - # if ConnectedMQTT: - # msg = f"PiAlert: {msg_count}" - # client.publish(mqttTopic, msg) - - # client.disconnect() - # client.loop_stop() #=============================================================================== # DB @@ -2033,6 +2017,19 @@ def to_text(_json): return payloadData +def get_device_stats(): + + openDB() + + sql.execute(""" + SELECT Online_Devices, Down_Devices, All_Devices, Archived_Devices from Online_History + """) + + row = sql.fetchone() + + closeDB() + return row + #=============================================================================== # BEGIN #===============================================================================