diff --git a/front/plugins/pihole_scan/README.md b/front/plugins/pihole_scan/README.md index 622c5306..28385052 100755 --- a/front/plugins/pihole_scan/README.md +++ b/front/plugins/pihole_scan/README.md @@ -1,27 +1,15 @@ ## Overview -A plugin allowing for importing Un-Discoverable devices from the settings page. -The main usecase is to add dumb network gear like unmanaged hubs and switches to the network view. -There might be other usecases, please let me know. +A plugin allowing for importing devices from the PiHole database. This is an import plugin using an SQLite database as a source. + ### Usage -- Go to settings and find Un-Discoverabe Devices in the list of plugins. -- Enable the plugin by changing the RUN parameter from disabled to `once` or `always_after_scan`. -- Add the name of your device to the list. (remove the sample entry first) -- SAVE -- wait for the next scan to finish +- TBD #### Examples: -Settings: -![settings](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/52883307-19a5-4602-b13a-9825461f6cc4) -resulting in these devices: -![devices](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/9f7659e7-75a8-4ae9-9f5f-781bdbcbc949) - -Allowing Un-Discoverable devices like hubs, switches or APs to be added to the network view. -![network](https://github.com/Data-Monkey/Pi.Alert/assets/7224371/b5ccc3b3-f5fd-4f5b-b0f0-e4e637c6da33) +TBD ### Known Limitations - - Un-Discoverable Devices always show as offline. That is expected as they can not be discovered by Pi.Alert. - - All IPs are set to 0.0.0.0 therefore the "Random MAC" icon might show up. + - TBD diff --git a/front/plugins/pihole_scan/script.py b/front/plugins/pihole_scan/script.py deleted file mode 100644 index 62890d9c..00000000 --- a/front/plugins/pihole_scan/script.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python - -import sqlite3 -import os -import pathlib -import argparse -import sys -import re -import base64 -import subprocess -from time import strftime - -sys.path.append("/home/pi/pialert/front/plugins") - -from plugin_helper import Plugin_Object, Plugin_Objects - -""" module to import db and leases from PiHole """ - -piholeDB = '/etc/pihole/pihole-FTL.db' - -pialertPath = '/home/pi/pialert' -dbPath = '/db/pialert.db' -fullPiAlertDbPath = pialertPath + dbPath - -CUR_PATH = str(pathlib.Path(__file__).parent.resolve()) -LOG_FILE = os.path.join(CUR_PATH, 'script.log') -RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log') - - -def main(): - - - fullPiAlertDbPath - - # # sample - # # /home/pi/pialert/front/plugins/arp_scan/script.py userSubnets=b'MTkyLjE2OC4xLjAvMjQgLS1pbnRlcmZhY2U9ZXRoMQ==' - # # the script expects a parameter in the format of userSubnets=subnet1,subnet2,... - # parser = argparse.ArgumentParser(description='Import devices from settings') - # parser.add_argument('userSubnets', nargs='+', help="list of subnets with options") - # values = parser.parse_args() - - # # Assuming Plugin_Objects is a class or function that reads data from the RESULT_FILE - # # and returns a list of objects called 'devices'. - # devices = Plugin_Objects(RESULT_FILE) - - # # Print a message to indicate that the script is starting. - # print('In script:') - - # # Assuming 'values' is a dictionary or object that contains a key 'userSubnets' - # # which holds a list of user-submitted subnets. - # # Printing the userSubnets list to check its content. - # print(values.userSubnets) - - # # Extract the base64-encoded subnet information from the first element of the userSubnets list. - # # The format of the element is assumed to be like 'userSubnets=b'. - # userSubnetsParamBase64 = values.userSubnets[0].split('userSubnets=b')[1] - - # # Printing the extracted base64-encoded subnet information. - # print(userSubnetsParamBase64) - - # # Decode the base64-encoded subnet information to get the actual subnet information in ASCII format. - # userSubnetsParam = base64.b64decode(userSubnetsParamBase64).decode('ascii') - - # # Print the decoded subnet information. - # print('userSubnetsParam:') - # print(userSubnetsParam) - - # # Check if the decoded subnet information contains multiple subnets separated by commas. - # # If it does, split the string into a list of individual subnets. - # # Otherwise, create a list with a single element containing the subnet information. - # if ',' in userSubnetsParam: - # subnets_list = userSubnetsParam.split(',') - # else: - # subnets_list = [userSubnetsParam] - - # # Execute the ARP scanning process on the list of subnets (whether it's one or multiple subnets). - # # The function 'execute_arpscan' is assumed to be defined elsewhere in the code. - # unique_devices = execute_arpscan(subnets_list) - - - # for device in unique_devices: - # devices.add_object( - # primaryId=device['mac'], # MAC (Device Name) - # secondaryId=device['ip'], # IP Address - # watched1=device['ip'], # Device Name - # watched2=device.get('hw', ''), # Vendor (assuming it's in the 'hw' field) - # watched3=device.get('interface', ''), # Add the interface - # watched4='', - # extra='arp-scan', - # foreignKey="") - - # devices.write_result_file() - - # return 0 - - -def execute_arpscan(userSubnets): - # output of possible multiple interfaces - arpscan_output = "" - devices_list = [] - - # scan each interface - - for interface in userSubnets : - - arpscan_output = execute_arpscan_on_interface (interface) - - print(arpscan_output) - - # Search IP + MAC + Vendor as regular expresion - re_ip = r'(?P((2[0-5]|1[0-9]|[0-9])?[0-9]\.){3}((2[0-5]|1[0-9]|[0-9])?[0-9]))' - re_mac = r'(?P([0-9a-fA-F]{2}[:-]){5}([0-9a-fA-F]{2}))' - re_hw = r'(?P.*)' - re_pattern = re.compile (re_ip + '\s+' + re_mac + '\s' + re_hw) - - devices_list_tmp = [ - {**device.groupdict(), "interface": interface} - for device in re.finditer(re_pattern, arpscan_output) - ] - - devices_list += devices_list_tmp - - # mylog('debug', ['[ARP Scan] Found: Devices including duplicates ', len(devices_list) ]) - - # Delete duplicate MAC - unique_mac = [] - unique_devices = [] - - for device in devices_list : - if device['mac'] not in unique_mac: - unique_mac.append(device['mac']) - unique_devices.append(device) - - # return list - # mylog('debug', ['[ARP Scan] Found: Devices without duplicates ', len(unique_devices) ]) - - print("Devices List len:", len(devices_list)) # Add this line to print devices_list - print("Devices List:", devices_list) # Add this line to print devices_list - - return devices_list - - -def execute_arpscan_on_interface(interface): - # Prepare command arguments - arpscan_args = ['sudo', 'arp-scan', '--ignoredups', '--retry=6'] + interface.split() - - # Execute command - try: - # try running a subprocess safely - result = subprocess.check_output(arpscan_args, universal_newlines=True) - except subprocess.CalledProcessError as e: - # An error occurred, handle it - error_type = type(e).__name__ # Capture the error type - result = "" - - return result - - -#------------------------------------------------------------------------------- -def copy_pihole_network (db): - """ - attach the PiHole Database and copy the PiHole_Network table accross into the PiAlert DB - """ - - sql = db.sql # TO-DO - # Open Pi-hole DB - print('[PiHole Network] - attach PiHole DB') - - try: - sql.execute ("ATTACH DATABASE '"+ piholeDB +"' AS PH") - except sqlite3.Error as e: - print(f'[PiHole Network] - SQL ERROR: {e}') - - - # Copy Pi-hole Network table - - try: - sql.execute ("DELETE FROM PiHole_Network") - - # just for reporting - new_devices = [] - sql.execute ( """SELECT hwaddr, macVendor, lastQuery, - (SELECT name FROM PH.network_addresses - WHERE network_id = id ORDER BY lastseen DESC, ip), - (SELECT ip FROM PH.network_addresses - WHERE network_id = id ORDER BY lastseen DESC, ip) - FROM PH.network - WHERE hwaddr NOT LIKE 'ip-%' - AND hwaddr <> '00:00:00:00:00:00' """) - new_devices = sql.fetchall() - - # insert into PiAlert DB - sql.execute ("""INSERT INTO PiHole_Network (PH_MAC, PH_Vendor, PH_LastQuery, - PH_Name, PH_IP) - SELECT hwaddr, macVendor, lastQuery, - (SELECT name FROM PH.network_addresses - WHERE network_id = id ORDER BY lastseen DESC, ip), - (SELECT ip FROM PH.network_addresses - WHERE network_id = id ORDER BY lastseen DESC, ip) - FROM PH.network - WHERE hwaddr NOT LIKE 'ip-%' - AND hwaddr <> '00:00:00:00:00:00' """) - sql.execute ("""UPDATE PiHole_Network SET PH_Name = '(unknown)' - WHERE PH_Name IS NULL OR PH_Name = '' """) - # Close Pi-hole DB - sql.execute ("DETACH PH") - - except sqlite3.Error as e: - print(f'[PiHole Network] - SQL ERROR: {e}') - - db.commitDB() - - print('[PiHole Network] - completed - found ', len(new_devices), ' devices') - return str(sql.rowcount) != "0" - - -#------------------------------------------------------------------------------- - - -#=============================================================================== -# BEGIN -#=============================================================================== -if __name__ == '__main__': - main()