UNIFI, undiscoverables, docs work

This commit is contained in:
Jokob-sk
2023-09-02 13:30:19 +10:00
parent 854ef20826
commit 25b54d1f62
5 changed files with 128 additions and 152 deletions

View File

@@ -17,9 +17,9 @@
- [set_password (SETPWD)](/front/plugins/set_password/)
- [nmap_scan (NMAP)](/front/plugins/nmap_scan/)
### SQL query based plugins
### SQL query-based plugins
- N/A, but the External SQLite based plugins work very similar
- No example available, but the External SQLite based plugins work very similar
### template based plugins
@@ -610,7 +610,7 @@ The UI will adjust how columns are displayed in the UI based on the resolvers de
| `device_name_mac` | The value is considered to be a MAC address, and a link pointing to the device with the given IP is generated. The link label is resolved as the target device name. |
| `url` | The value is considered to be a URL, so a link is generated. |
| `textbox_save` | Generates an editable and saveable text box that saves values in the database. Primarily intended for the `UserData` database column in the `Plugins_Objects` table. |
| `url_http_https` | Generates towo links with the `https` and `http` prefix as lock icons. |
| `url_http_https` | Generates two links with the `https` and `http` prefix as lock icons. |
> [!NOTE]

View File

@@ -226,7 +226,7 @@
"mapped_to_column": "cur_MAC",
"css_classes": "col-sm-2",
"show": true,
"type": "device_mac",
"type": "label",
"default_value":"",
"options": [],
"localized": ["name"],
@@ -291,6 +291,23 @@
"language_code":"es_es",
"string" : "Cambiado"
}]
}
},
{
"column": "ForeignKey",
"css_classes": "col-sm-2",
"show": false,
"type": "device_mac",
"default_value":"",
"options": [],
"localized": ["name"],
"name":[{
"language_code":"en_us",
"string" : "MAC"
},
{
"language_code":"es_es",
"string" : "MAC"
}]
}
]
}

View File

@@ -5,6 +5,7 @@ import os
import pathlib
import argparse
import sys
import hashlib
sys.path.append("/home/pi/pialert/front/plugins")
sys.path.append('/home/pi/pialert/pialert')
@@ -31,6 +32,9 @@ def main():
if values.devices:
for fake_dev in values.devices.split('=')[1].split(','):
fake_mac = string_to_mac_hash(fake_dev)
plugin_objects.add_object(
primaryId=fake_dev, # MAC (Device Name)
secondaryId="0.0.0.0", # IP Address (always 0.0.0.0)
@@ -39,12 +43,21 @@ def main():
watched3="",
watched4="",
extra="",
foreignKey="")
foreignKey=fake_mac)
plugin_objects.write_result_file()
return 0
def string_to_mac_hash(input_string):
# Calculate a hash using SHA-256
sha256_hash = hashlib.sha256(input_string.encode()).hexdigest()
# Take the first 12 characters of the hash and format as a MAC address
mac_hash = ':'.join(sha256_hash[i:i+2] for i in range(0, 12, 2))
return mac_hash
#===============================================================================
# BEGIN
#===============================================================================

View File

@@ -2,102 +2,91 @@
# Inspired by https://github.com/stevehoek/Pi.Alert
# Example call
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 site=default protocol=https:// port=8443 version='UDMP-unifiOS'
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 site=default protocol=https port=8443 version='UDMP-unifiOS'
# python3 /home/pi/pialert/front/plugins/unifi_import/script.py username=pialert password=passw0rd host=192.168.1.1 sites=sdefault port=8443 verifyssl=false version=v5
from __future__ import unicode_literals
from time import sleep, time, strftime
import requests
from requests import Request, Session, packages
import pathlib
import threading
import subprocess
import socket
import json
from time import strftime
import argparse
import logging
import pathlib
import os
import json
import sys
import requests
from requests import Request, Session, packages
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from pyunifi.controller import Controller
# Add your paths here
sys.path.append("/home/pi/pialert/front/plugins")
sys.path.append('/home/pi/pialert/pialert')
curPath = str(pathlib.Path(__file__).parent.resolve())
log_file = curPath + '/script.log'
last_run = curPath + '/last_result.log'
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s'
)
unifi_logger = logging.getLogger('[UNIFI]')
unifi_logger.setLevel(logging.INFO)
from plugin_helper import Plugin_Object, Plugin_Objects
from logger import mylog
CUR_PATH = str(pathlib.Path(__file__).parent.resolve())
LOG_FILE = os.path.join(CUR_PATH, 'script.log')
RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log')
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Workflow
def main():
def main():
mylog('verbose', ['[UNFIMP] In script'])
unifi_logger.info('Start scan')
# init global variables
global UNIFI_USERNAME, UNIFI_PASSWORD, UNIFI_HOST, UNIFI_SITES, PORT, VERIFYSSL, VERSION
# empty file
unifi_logger.debug('Purging old values')
with open(last_run, 'w') as last_run_logfile:
last_run_logfile.write("")
parser = argparse.ArgumentParser(description='Import devices from a UNIFI controller')
parser = argparse.ArgumentParser(description='Import devices from an UNIFI controller')
parser.add_argument('username', action="store", help="Username used to login into the UNIFI controller")
parser.add_argument('password', action="store", help="Password used to login into the UNIFI controller")
parser.add_argument('host', action="store", help="Host url or IP address where the UNIFI controller is hosted (excluding http://)")
parser.add_argument('sites', action="store", help="Name of the sites (usually 'default', check the URL in your UniFi controller UI). Separated by comma (,) if passing multiple sites")
parser.add_argument('port', action="store", help="Usually 8443")
parser.add_argument('verifyssl', action="store", help="verify SSL certificate [true|false]")
parser.add_argument('version', action="store", help="The base version of the controller API [v4|v5|unifiOS|UDMP-unifiOS]")
parser.add_argument('username', action="store", help="Username used to login into the UNIFI controller")
parser.add_argument('password', action="store", help="Password used to login into the UNIFI controller")
parser.add_argument('host', action="store", help="Host url or IP address where the UNIFI controller is hosted (excluding http://)")
parser.add_argument('sites', action="store", help="Name of the sites (usually 'default', check the URL in your UniFi controller UI). Separated by comma (,) if passing multiple sites")
parser.add_argument('port', action="store", help="Usually 8443")
parser.add_argument('verifyssl', action="store", help="verify SSL certificate [true|false]")
parser.add_argument('version', action="store", help="The base version of the controller API [v4|v5|unifiOS|UDMP-unifiOS]")
values = parser.parse_args()
# parse output
newEntries = []
plugin_objects = Plugin_Objects(RESULT_FILE)
mylog('verbose', [f'[UNFIMP] Check if all login information is available: {values}'])
unifi_logger.debug(f'Check if all login information is available: {values}')
if values.username and values.password and values.host and values.sites:
UNIFI_USERNAME = values.username.split('=')[1]
UNIFI_USERNAME = values.username.split('=')[1]
UNIFI_PASSWORD = values.password.split('=')[1]
UNIFI_HOST = values.host.split('=')[1]
UNIFI_SITES = values.sites.split('=')[1]
UNIFI_HOST = values.host.split('=')[1]
UNIFI_SITES = values.sites.split('=')[1]
PORT = values.port.split('=')[1]
VERIFYSSL = values.verifyssl.split('=')[1]
VERSION = values.version.split('=')[1]
newEntries = get_entries(newEntries)
plugin_objects = get_entries(plugin_objects)
unifi_logger.debug(f'Print {len(newEntries)} to monitoring log')
for e in newEntries:
# Insert list into the log
service_monitoring_log(e.primaryId, e.secondaryId, e.created, e.watched1, e.watched2, e.watched3, e.watched4, e.extra, e.foreignKey )
plugin_objects.write_result_file()
unifi_logger.info(f'Scan finished, found {len(newEntries)} devices')
mylog('verbose', [f'[UNFIMP] Scan finished, found {len(plugin_objects)} devices'])
# -----------------------------------------------------------------------------
def get_entries(newEntries):
# .............................................
def get_entries(plugin_objects):
global VERIFYSSL
sites = []
if ',' in UNIFI_SITES:
sites = UNIFI_SITES.split(',')
else:
sites.append(UNIFI_SITES)
@@ -108,13 +97,13 @@ def get_entries(newEntries):
for site in sites:
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, port=PORT, version=VERSION, ssl_verify=VERIFYSSL, site_id=site )
unifi_logger.debug('identify Unifi Devices')
c = Controller(UNIFI_HOST, UNIFI_USERNAME, UNIFI_PASSWORD, port=PORT, version=VERSION, ssl_verify=VERIFYSSL, site_id=site)
mylog('verbose', [f'[UNFIMP] Identify Unifi Devices'])
# get all Unifi devices
for ap in c.get_aps():
# print(f'{json.dumps(ap)}')
# mylog('verbose', [f'{json.dumps(ap)}'])
deviceType = ''
if (ap['type'] == 'udm'):
@@ -131,37 +120,35 @@ def get_entries(newEntries):
name = set_name(name, hostName)
tmpPlugObj = plugin_object_class(
ap['mac'],
get_unifi_val(ap, 'ip'),
name,
'Ubiquiti Networks Inc.',
deviceType,
ap['state'],
get_unifi_val(ap, 'connection_network_name')
)
plugin_objects.add_object(
primaryId=ap['mac'],
secondaryId=get_unifi_val(ap, 'ip'),
watched1=name,
watched2='Ubiquiti Networks Inc.',
watched3=deviceType,
watched4=ap['state'],
extra=get_unifi_val(ap, 'connection_network_name')
)
newEntries.append(tmpPlugObj)
unifi_logger.debug(f'Found {len(newEntries)} Unifi Devices')
# print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Unifi Devices'])
online_macs = set()
# get_clients() returns all clients which are currently online.
for cl in c.get_clients():
# print(f'{json.dumps(cl)}')
# mylog('verbose', [f'{json.dumps(cl)}'])
online_macs.add(cl['mac'])
unifi_logger.debug(f'Found {len(online_macs)} Online Clients')
# print(f'>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>')
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Online Devices'])
# get_users() returns all clients known by the controller
for user in c.get_users():
# print(f'{json.dumps(user)}')
mylog('verbose', [f'{json.dumps(user)}'])
name = get_unifi_val(user, 'name')
hostName = get_unifi_val(user, 'hostname')
@@ -172,21 +159,25 @@ def get_entries(newEntries):
if status == 1:
tmpPlugObj = plugin_object_class(
user['mac'],
get_unifi_val(user, 'last_ip'),
name,
get_unifi_val(user, 'oui'),
'Other',
status,
get_unifi_val(user, 'last_connection_network_name')
ipTmp = get_unifi_val(user, 'last_ip')
if ipTmp == 'null':
ipTmp = get_unifi_val(user, 'fixed_ip')
plugin_objects.add_object(
primaryId=user['mac'],
secondaryId=ipTmp,
watched1=name,
watched2=get_unifi_val(user, 'oui'),
watched3='Other',
watched4=status,
extra=get_unifi_val(user, 'last_connection_network_name')
)
newEntries.append(tmpPlugObj)
unifi_logger.debug(f'Found {len(newEntries)} Clients overall')
return newEntries
mylog('verbose', [f'[UNFIMP] Found {len(plugin_objects)} Clients overall'])
return plugin_objects
# -----------------------------------------------------------------------------
def get_unifi_val(obj, key):
@@ -214,60 +205,8 @@ def set_name(name: str, hostName: str) -> str:
else:
return 'null'
# -------------------------------------------------------------------
class plugin_object_class:
def __init__(self, primaryId = '',secondaryId = '', watched1 = '',watched2 = '',watched3 = '',watched4 = '',extra = '',foreignKey = ''):
self.pluginPref = ''
self.primaryId = primaryId
self.secondaryId = secondaryId
self.created = strftime("%Y-%m-%d %H:%M:%S")
self.changed = ''
self.watched1 = watched1
self.watched2 = watched2
self.watched3 = watched3
self.watched4 = watched4
self.status = ''
self.extra = extra
self.userData = ''
self.foreignKey = foreignKey
# -----------------------------------------------------------------------------
def service_monitoring_log(primaryId, secondaryId, created, watched1, watched2 = 'null', watched3 = 'null', watched4 = 'null', extra ='null', foreignKey ='null'):
if watched1 == '':
watched1 = 'null'
if watched2 == '':
watched2 = 'null'
if watched3 == '':
watched3 = 'null'
if watched4 == '':
watched4 = 'null'
if extra == '':
extra = 'null'
if foreignKey == '':
foreignKey = 'null'
unifi_logger.debug(f'Adding entry to monitoring log:\n{primaryId}, {secondaryId}, {created}, {watched1}, {watched2}, {watched3}, {watched4}, {extra}')
with open(last_run, 'a') as last_run_logfile:
last_run_logfile.write("{}|{}|{}|{}|{}|{}|{}|{}|{}\n".format(
primaryId,
secondaryId,
created,
watched1,
watched2,
watched3,
watched4,
extra,
foreignKey
)
)
#===============================================================================
# BEGIN
#===============================================================================
if __name__ == '__main__':
main()
if __name__ == '__main__':
main()