MQTT prep-work
This commit is contained in:
147
back/pialert.py
147
back/pialert.py
@@ -33,6 +33,7 @@ import json
|
|||||||
import requests
|
import requests
|
||||||
from base64 import b64encode
|
from base64 import b64encode
|
||||||
from paho.mqtt import client as mqtt_client
|
from paho.mqtt import client as mqtt_client
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
# sys.stdout = open('pialert_new.log', 'w')
|
# sys.stdout = open('pialert_new.log', 'w')
|
||||||
@@ -147,15 +148,16 @@ except NameError:
|
|||||||
send_report_on_cycles = [1, "internet_IP", "update_vendors_silent"]
|
send_report_on_cycles = [1, "internet_IP", "update_vendors_silent"]
|
||||||
cycle = ""
|
cycle = ""
|
||||||
network_scan_minutes = 5
|
network_scan_minutes = 5
|
||||||
|
mqtt_thread_up = False
|
||||||
|
|
||||||
# timestamps of last execution times
|
# timestamps of last execution times
|
||||||
time_now = datetime.datetime.now()
|
time_now = datetime.datetime.now()
|
||||||
now_minus_one_day = time_now - timedelta(hours = 24)
|
now_minus_24h = time_now - timedelta(hours = 24)
|
||||||
|
|
||||||
last_network_scan = now_minus_one_day
|
last_network_scan = now_minus_24h
|
||||||
last_internet_IP_scan = now_minus_one_day
|
last_internet_IP_scan = now_minus_24h
|
||||||
last_run = now_minus_one_day
|
last_run = now_minus_24h
|
||||||
last_cleanup = now_minus_one_day
|
last_cleanup = now_minus_24h
|
||||||
last_update_vendors = time_now - timedelta(days = 7)
|
last_update_vendors = time_now - timedelta(days = 7)
|
||||||
|
|
||||||
def main ():
|
def main ():
|
||||||
@@ -189,9 +191,16 @@ def main ():
|
|||||||
startTime = time_now
|
startTime = time_now
|
||||||
startTime = startTime.replace (second=0, microsecond=0)
|
startTime = startTime.replace (second=0, microsecond=0)
|
||||||
|
|
||||||
## Upgrade DB if needed
|
# Upgrade DB if needed
|
||||||
upgradeDB()
|
upgradeDB()
|
||||||
|
|
||||||
|
# Start MQTT thread if configured
|
||||||
|
# if reportMQTT and not mqtt_thread_up:
|
||||||
|
# print ('Establishing MQTT thread...')
|
||||||
|
# # start_mqtt_thread (connect_mqtt())
|
||||||
|
# # connect_mqtt()
|
||||||
|
# mqtt_start()
|
||||||
|
|
||||||
# determine run/scan type based on passed time
|
# determine run/scan type based on passed time
|
||||||
if last_internet_IP_scan + timedelta(minutes=3) < time_now:
|
if last_internet_IP_scan + timedelta(minutes=3) < time_now:
|
||||||
cycle = 'internet_IP'
|
cycle = 'internet_IP'
|
||||||
@@ -1567,11 +1576,6 @@ def email_reporting ():
|
|||||||
send_pushsafer (mail_text)
|
send_pushsafer (mail_text)
|
||||||
else :
|
else :
|
||||||
print (' Skip PUSHSAFER...')
|
print (' Skip PUSHSAFER...')
|
||||||
if reportMQTT :
|
|
||||||
print (' Establishing MQTT runtime...')
|
|
||||||
start_sending_mqtt (connect_mqtt(), json_final)
|
|
||||||
else :
|
|
||||||
print (' Skip MQTT...')
|
|
||||||
else :
|
else :
|
||||||
print (' No changes to report...')
|
print (' No changes to report...')
|
||||||
|
|
||||||
@@ -1780,78 +1784,58 @@ def send_apprise (html):
|
|||||||
logResult (stdout, stderr)
|
logResult (stdout, stderr)
|
||||||
|
|
||||||
#-------------------------------------------------------------------------------
|
#-------------------------------------------------------------------------------
|
||||||
ConnectedMQTT = False #global variable for the state of the connection
|
|
||||||
client = mqtt_client.Client(mqttClientId) # Set Connecting Client ID
|
|
||||||
|
|
||||||
def connect_mqtt():
|
|
||||||
if ConnectedMQTT == False:
|
|
||||||
def on_connect(client, userdata, flags, rc):
|
|
||||||
if rc == 0:
|
|
||||||
ConnectedMQTT = True
|
|
||||||
print(" Connected to MQTT Broker!")
|
|
||||||
else:
|
|
||||||
ConnectedMQTT = False
|
|
||||||
print(" Failed to connect, return code %d\n", rc)
|
|
||||||
|
|
||||||
client.username_pw_set(mqttUser, mqttPassword)
|
|
||||||
client.on_connect = on_connect
|
|
||||||
client.connect(mqttBroker, mqttPort)
|
|
||||||
return client
|
|
||||||
|
|
||||||
#-------------------------------------------------------------------------------
|
|
||||||
def start_sending_mqtt(client, json_final):
|
|
||||||
client.loop_start() #start the loop
|
|
||||||
|
|
||||||
index = 0
|
|
||||||
for event in json_final['events']:
|
|
||||||
index = index + 1
|
|
||||||
|
|
||||||
deviceMAC = event[index][0]
|
|
||||||
deviceIP = event[index][1]
|
|
||||||
eventType = event[index][3]
|
|
||||||
deviceName = event[index][8]
|
|
||||||
deviceVendor = event[index][11]
|
|
||||||
# deviceNetworkNode = event[index][23]
|
|
||||||
|
|
||||||
|
|
||||||
payload_str = (
|
def mqtt_start():
|
||||||
'{'
|
global mqtt_thread_up
|
||||||
+ f'"device_mac": {deviceMAC},'
|
|
||||||
+ f'"device_ip": {deviceIP},'
|
|
||||||
+ f'"event_type": {eventType},'
|
|
||||||
+ f'"device_name": {deviceName},'
|
|
||||||
+ f'"device_vendor": {deviceVendor},'
|
|
||||||
# + f'"device_network_node": "{deviceNetworkNode}"'
|
|
||||||
+ "}"
|
|
||||||
)
|
|
||||||
|
|
||||||
client.publish(
|
mqtt_thread_up = True
|
||||||
# topic=f"system-sensors/sensor/{deviceName}/state",
|
|
||||||
topic=f"system-sensors/sensor/PiAlert/state",
|
client = mqtt_client.Client(mqttClientId) # Set Connecting Client ID
|
||||||
payload=payload_str,
|
client.username_pw_set(mqttUser, mqttPassword)
|
||||||
|
client.connect(mqttBroker, mqttPort)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
|
||||||
|
time.sleep(15)
|
||||||
|
|
||||||
|
# Online_Devices, Down_Devices, All_Devices, Archived_Devices
|
||||||
|
row = get_device_stats()
|
||||||
|
|
||||||
|
client.publish("pialert/devices/online/number",
|
||||||
|
payload=row[0],
|
||||||
qos=1,
|
qos=1,
|
||||||
retain=True
|
retain=True)
|
||||||
)
|
|
||||||
|
|
||||||
client.publish(
|
client.publish("pialert/devices/number",
|
||||||
# topic=f"system-sensors/sensor/{deviceName}/state",
|
payload=row[2],
|
||||||
topic=f"system-sensors/sensor/PiAlert2/state",
|
qos=1,
|
||||||
payload=payload_str,
|
retain=True)
|
||||||
retain=True
|
|
||||||
)
|
|
||||||
|
|
||||||
client.publish(
|
print ('MQTT published messages')
|
||||||
# topic=f"system-sensors/sensor/{deviceName}/state",
|
|
||||||
topic=f"system-sensors/sensor/PiAlert3/state",
|
|
||||||
payload='test message'
|
|
||||||
)
|
|
||||||
|
|
||||||
# if ConnectedMQTT:
|
|
||||||
# msg = f"PiAlert: {msg_count}"
|
|
||||||
# client.publish(mqttTopic, msg)
|
|
||||||
|
|
||||||
# client.disconnect()
|
client.loop()
|
||||||
# client.loop_stop()
|
|
||||||
|
|
||||||
|
|
||||||
|
# #-------------------------------------------------------------------------------
|
||||||
|
# def start_mqtt_thread (client):
|
||||||
|
# # start a MQTT thread loop which will continuously report on devices to the broker
|
||||||
|
# # daemon=True - makes sure the thread dies with the process if interrupted
|
||||||
|
# global mqtt_thread_up
|
||||||
|
|
||||||
|
# # flag to check if thread is running
|
||||||
|
# mqtt_thread_up = True
|
||||||
|
|
||||||
|
# print(" Starting MQTT sending")
|
||||||
|
# # x = threading.Thread(target=start_sending_mqtt, args=(client,), daemon=True)
|
||||||
|
# start_sending_mqtt(client)
|
||||||
|
|
||||||
|
# print(" Threading: Starting MQTT thread")
|
||||||
|
|
||||||
|
# # x.start()
|
||||||
|
|
||||||
|
|
||||||
#===============================================================================
|
#===============================================================================
|
||||||
# DB
|
# DB
|
||||||
@@ -2033,6 +2017,19 @@ def to_text(_json):
|
|||||||
|
|
||||||
return payloadData
|
return payloadData
|
||||||
|
|
||||||
|
def get_device_stats():
|
||||||
|
|
||||||
|
openDB()
|
||||||
|
|
||||||
|
sql.execute("""
|
||||||
|
SELECT Online_Devices, Down_Devices, All_Devices, Archived_Devices from Online_History
|
||||||
|
""")
|
||||||
|
|
||||||
|
row = sql.fetchone()
|
||||||
|
|
||||||
|
closeDB()
|
||||||
|
return row
|
||||||
|
|
||||||
#===============================================================================
|
#===============================================================================
|
||||||
# BEGIN
|
# BEGIN
|
||||||
#===============================================================================
|
#===============================================================================
|
||||||
|
|||||||
Reference in New Issue
Block a user