diff --git a/dockerfiles/setup.sh b/dockerfiles/setup.sh index 2be6e732..96dcaa47 100755 --- a/dockerfiles/setup.sh +++ b/dockerfiles/setup.sh @@ -93,10 +93,10 @@ fi # Create an empty log files # Create the execution_queue.log and app_front.log files if they don't exist touch "${INSTALL_DIR}"/front/log/{app.log,execution_queue.log,app_front.log,app.php_errors.log,stderr.log,stdout.log,db_is_locked.log} -touch "${INSTALL_DIR}"/api/{user_notifications.json} +touch "${INSTALL_DIR}"/front/api/{user_notifications.json} echo "[INSTALL] Fixing permissions after copied starter config & DB" -chown -R nginx:www-data "${INSTALL_DIR}"/{config,front/log,db} +chown -R nginx:www-data "${INSTALL_DIR}"/{config,front/log,db,front/api} chmod 750 "${INSTALL_DIR}"/{config,front/log,db} find "${INSTALL_DIR}"/{config,front/log,db} -type f -exec chmod 640 {} \; diff --git a/front/plugins/sync/config.json b/front/plugins/sync/config.json index 8e13c4a3..0f67bff4 100755 --- a/front/plugins/sync/config.json +++ b/front/plugins/sync/config.json @@ -379,6 +379,25 @@ } ] }, + { + "function": "devices", + "type": "boolean", + "default_value": false, + "options": [], + "localized": ["name", "description"], + "name": [ + { + "language_code": "en_us", + "string": "Send Devices" + } + ], + "description": [ + { + "language_code": "en_us", + "string": "When enabled the whole Devices table is sent over. Only new devices with new MACs are inserted in the target hub." + } + ] + }, { "function": "plugins", "type": "text.multiselect", @@ -402,7 +421,7 @@ "name": [ { "language_code": "en_us", - "string": "Plugins" + "string": "Send Plugins" } ], "description": [ diff --git a/front/plugins/sync/sync.py b/front/plugins/sync/sync.py index 5c754946..0b0449d8 100755 --- a/front/plugins/sync/sync.py +++ b/front/plugins/sync/sync.py @@ -5,6 +5,8 @@ import pathlib import sys import hashlib import requests +import json +import sqlite3 # Define the installation path and extend the system path for plugin imports @@ -12,8 +14,9 @@ INSTALL_PATH = "/app" sys.path.extend([f"{INSTALL_PATH}/front/plugins", f"{INSTALL_PATH}/server"]) from plugin_helper import Plugin_Object, Plugin_Objects, decodeBase64 -from plugin_utils import get_plugins_configs +from plugin_utils import get_plugins_configs, decode_and_rename_files from logger import mylog +from const import pluginsPath, fullDbPath from helper import timeNowTZ, get_setting_value from cryptography import encrypt_data @@ -22,21 +25,21 @@ CUR_PATH = str(pathlib.Path(__file__).parent.resolve()) LOG_FILE = os.path.join(CUR_PATH, 'script.log') RESULT_FILE = os.path.join(CUR_PATH, 'last_result.log') +# Initialize the Plugin obj output file +plugin_objects = Plugin_Objects(RESULT_FILE) + pluginName = 'SYNC' def main(): mylog('verbose', [f'[{pluginName}] In script']) - - # Initialize the Plugin obj output file - plugin_objects = Plugin_Objects(RESULT_FILE) - # Retrieve configuration settings plugins_to_sync = get_setting_value('SYNC_plugins') api_token = get_setting_value('SYNC_api_token') encryption_key = get_setting_value('SYNC_encryption_key') hub_url = get_setting_value('SYNC_hub_url') node_name = get_setting_value('SYNC_node_name') + send_devices = get_setting_value('SYNC_devices') # Get all plugin configurations all_plugins = get_plugins_configs() @@ -44,6 +47,7 @@ def main(): mylog('verbose', [f'[{pluginName}] DEBUG {len(all_plugins)}']) mylog('verbose', [f'[{pluginName}] plugins_to_sync {plugins_to_sync}']) + # Plugins processing index = 0 for plugin in all_plugins: pref = plugin["unique_prefix"] @@ -63,47 +67,130 @@ def main(): mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"']) - # Encrypt the log data using the encryption_key - encrypted_data = encrypt_data(file_content, encryption_key) + # encrypt and send data to the hub + send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url) - mylog('verbose', [f'[{pluginName}] Sending encrypted_data: "{encrypted_data}"']) - - # Prepare the data payload for the POST request - data = { - 'data': encrypted_data, - 'plugin_folder': plugin_folder, - 'node_name': node_name - } - - # Set the authorization header with the API token - headers = {'Authorization': f'Bearer {api_token}'} - api_endpoint = f"{hub_url}/plugins/sync/hub.php" - response = requests.post(api_endpoint, data=data, headers=headers) - - mylog('verbose', [f'[{pluginName}] response: "{response}"']) - - if response.status_code == 200: - mylog('verbose', [f'[{pluginName}] Data for "{plugin_folder}" sent successfully']) - else: - mylog('verbose', [f'[{pluginName}] Failed to send data for "{plugin_folder}"']) - - # log result - plugin_objects.add_object( - primaryId = pref, - secondaryId = timeNowTZ(), - watched1 = node_name, - watched2 = response.status_code, - watched3 = response, - watched4 = '', - extra = '', - foreignKey = '') else: mylog('verbose', [f'[{pluginName}] {plugin_folder}/last_result.log not found']) + # Devices procesing + if send_devices: + + file_path = f"{INSTALL_PATH}/front/api/table_devices.json" + plugin_folder = 'sync' + pref = 'SYNC' + + if os.path.exists(file_path): + # Read the content of the log file + with open(file_path, 'r') as f: + file_content = f.read() + + mylog('verbose', [f'[{pluginName}] Sending file_content: "{file_content}"']) + send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url) + + # process any received data for the Device DB table + # Create the file path + file_dir = os.path.join(pluginsPath, 'sync') + file_prefix = 'last_result' + + # Decode files, rename them, and get the list of files + files_to_process = decode_and_rename_files(file_dir, file_prefix) + + # Connect to the App database + conn = sqlite3.connect(fullDbPath) + cursor = conn.cursor() + + # Collect all unique dev_MAC values from the JSON files + unique_mac_addresses = set() + device_data = [] + + for file_path in files_to_process: + + # only process received .log files, skipping the one logging the progress of this plugin + if file_path != 'last_result.log': + mylog('verbose', [f'[{pluginName}] Processing: "{file_path}"']) + + # Store e.g. Node_1 from last_result.encoded.Node_1.1.log + tmp_SyncHubNodeName = '' + if len(filename.split('.')) > 3: + tmp_SyncHubNodeName = filename.split('.')[2] + + with open(file_path, 'r') as f: + data = json.load(f) + for device in data['data']: + device['dev_SyncHubNodeName'] = tmp_SyncHubNodeName + unique_mac_addresses.add(device['dev_MAC']) + device_data.append(device) + + if len(device_data) > 0: + # Retrieve existing dev_MAC values from the Devices table + placeholders = ', '.join('?' for _ in unique_mac_addresses) + cursor.execute(f'SELECT dev_MAC FROM Devices WHERE dev_MAC IN ({placeholders})', tuple(unique_mac_addresses)) + existing_mac_addresses = set(row[0] for row in cursor.fetchall()) + + # Filter out existing devices + new_devices = [device for device in device_data if device['dev_MAC'] not in existing_mac_addresses] + + # Prepare the insert statement + if new_devices: + columns = ', '.join(new_devices[0].keys()) + placeholders = ', '.join('?' for _ in new_devices[0]) + sql = f'INSERT INTO Devices ({columns}) VALUES ({placeholders})' + + # Extract values for the new devices + values = [tuple(device.values()) for device in new_devices] + + mylog('verbose', [f'[{pluginName}] Inserting Devices SQL : "{sql}"']) + mylog('verbose', [f'[{pluginName}] Inserting Devices VALUES: "{values}"']) + + # Use executemany for batch insertion + cursor.executemany(sql, values) + + # Commit and close the connection + conn.commit() + conn.close() + # log result plugin_objects.write_result_file() return 0 + +def send_data(api_token, file_content, encryption_key, plugin_folder, node_name, pref, hub_url): + # Encrypt the log data using the encryption_key + encrypted_data = encrypt_data(file_content, encryption_key) + + mylog('verbose', [f'[{pluginName}] Sending encrypted_data: "{encrypted_data}"']) + + # Prepare the data payload for the POST request + data = { + 'data': encrypted_data, + 'plugin_folder': plugin_folder, + 'node_name': node_name + } + # Set the authorization header with the API token + headers = {'Authorization': f'Bearer {api_token}'} + api_endpoint = f"{hub_url}/plugins/sync/hub.php" + response = requests.post(api_endpoint, data=data, headers=headers) + + mylog('verbose', [f'[{pluginName}] response: "{response}"']) + + if response.status_code == 200: + mylog('verbose', [f'[{pluginName}] Data for "{plugin_folder}" sent successfully']) + else: + mylog('verbose', [f'[{pluginName}] Failed to send data for "{plugin_folder}"']) + + # log result + plugin_objects.add_object( + primaryId = pref, + secondaryId = timeNowTZ(), + watched1 = node_name, + watched2 = response.status_code, + watched3 = response, + watched4 = '', + extra = '', + foreignKey = '') + + if __name__ == '__main__': main() diff --git a/server/plugin.py b/server/plugin.py index 558d26cc..7728f1bf 100755 --- a/server/plugin.py +++ b/server/plugin.py @@ -13,7 +13,7 @@ from const import pluginsPath, logPath, applicationPath, reportTemplatesPath from logger import mylog from helper import timeNowTZ, updateState, get_file_content, write_file, get_setting, get_setting_value from api import update_api -from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder +from plugin_utils import logEventStatusCounts, get_plugin_string, get_plugin_setting_obj, print_plugin_info, list_to_csv, combine_plugin_objects, resolve_wildcards_arr, handle_empty, custom_plugin_decoder, decode_and_rename_files from notification import Notification_obj from cryptography import decrypt_data @@ -211,17 +211,15 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): mylog('debug', ['[Plugins] Resolved : ', command]) try: - # try runnning a subprocess with a forced timeout in case the subprocess hangs - output = subprocess.check_output (command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) + # try running a subprocess with a forced timeout in case the subprocess hangs + output = subprocess.check_output(command, universal_newlines=True, stderr=subprocess.STDOUT, timeout=(set_RUN_TIMEOUT)) except subprocess.CalledProcessError as e: - # An error occured, handle it + # An error occurred, handle it mylog('none', [e.output]) mylog('none', ['[Plugins] ⚠ ERROR - enable LOG_LEVEL=debug and check logs']) except subprocess.TimeoutExpired as timeErr: mylog('none', [f'[Plugins] ⚠ ERROR - TIMEOUT - the plugin {plugin["unique_prefix"]} forcefully terminated as timeout reached. Increase TIMEOUT setting and scan interval.']) - - # check the last run output # Initialize newLines newLines = [] @@ -229,85 +227,52 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): file_dir = os.path.join(pluginsPath, plugin["code_name"]) file_prefix = 'last_result' - # key to decrypt data if available - encryption_key = get_setting_value('SYNC_encryption_key') - # Check for files starting with the specified prefix - matching_files = [f for f in os.listdir(file_dir) if f.startswith(file_prefix)] + # Decode files, rename them, and get the list of files + files_to_process = decode_and_rename_files(file_dir, file_prefix) - for filename in matching_files: - # Create the full file path - file_path = os.path.join(file_dir, filename) + for filename in files_to_process: + # Open the decrypted file and process its contents + with open(os.path.join(file_dir, filename), 'r') as f: + newLines = f.read().split('\n') + + # if the script produced some output, clean it up to ensure it's the correct format + # cleanup - select only lines containing a separator to filter out unnecessary data + newLines = list(filter(lambda x: '|' in x, newLines)) + + # Store e.g. Node_1 from last_result.encoded.Node_1.1.log + tmp_SyncHubNodeName = '' + if len(filename.split('.')) > 3: + tmp_SyncHubNodeName = filename.split('.')[2] - # Check if the file exists - if os.path.exists(file_path): - - tmp_SyncHubNodeName = 'null' - - # Check if the file name contains "encoded" - if '.encoded.' in filename and encryption_key != '': - - # store e.g. Node_1 from last_result.encoded.Node_1.1.log - tmp_SyncHubNodeName = filename.split('.')[2] - - # Decrypt the entire file - with open(file_path, 'r+') as f: - encrypted_data = f.read() - decrypted_data = decrypt_data(encrypted_data, encryption_key) - - # Write the decrypted data back to the file - f.seek(0) - f.write(decrypted_data) - f.truncate() - - # Rename the file e.g. from last_result.encoded.Node_1.1.log to last_result.decoded.Node_1.1.log - new_filename = filename.replace('.encoded.', '.decoded.') - os.rename(file_path, os.path.join(file_dir, new_filename)) - - elif filename == 'last_result.log' : - new_filename = filename - else: - # skipping decoded and other files - continue - - # Open the decrypted file and process its contents - with open(os.path.join(file_dir, new_filename), 'r') as f: - newLines = f.read().split('\n') - - # if the script produced some outpout, clean it up to ensure it's the correct format - # cleanup - select only lines containing a separator to filter out unnecessary data - newLines = list(filter(lambda x: '|' in x, newLines)) - - for line in newLines: - columns = line.split("|") - # There has to be always 9 columns - if len(columns) == 9: - # Create a tuple containing values to be inserted into the database. - # Each value corresponds to a column in the table in the order of the columns. - # must match the Plugins_Objects and Plugins_Events database tables and can be used as input for the plugin_object_class. - sqlParams.append( - ( - 0, # "Index" placeholder - plugin["unique_prefix"], # "Plugin" column value from the plugin dictionary - columns[0], # "Object_PrimaryID" value from columns list - columns[1], # "Object_SecondaryID" value from columns list - 'null', # Placeholder for "DateTimeCreated" column - columns[2], # "DateTimeChanged" value from columns list - columns[3], # "Watched_Value1" value from columns list - columns[4], # "Watched_Value2" value from columns list - columns[5], # "Watched_Value3" value from columns list - columns[6], # "Watched_Value4" value from columns list - 'not-processed', # "Status" column (placeholder) - columns[7], # "Extra" value from columns list - 'null', # Placeholder for "UserData" column - columns[8], # "ForeignKey" value from columns list - tmp_SyncHubNodeName # Sync Hub Node name - ) + for line in newLines: + columns = line.split("|") + # There have to be always 9 columns + if len(columns) == 9: + # Create a tuple containing values to be inserted into the database. + # Each value corresponds to a column in the table in the order of the columns. + # must match the Plugins_Objects and Plugins_Events database tables and can be used as input for the plugin_object_class. + sqlParams.append( + ( + 0, # "Index" placeholder + plugin["unique_prefix"], # "Plugin" column value from the plugin dictionary + columns[0], # "Object_PrimaryID" value from columns list + columns[1], # "Object_SecondaryID" value from columns list + 'null', # Placeholder for "DateTimeCreated" column + columns[2], # "DateTimeChanged" value from columns list + columns[3], # "Watched_Value1" value from columns list + columns[4], # "Watched_Value2" value from columns list + columns[5], # "Watched_Value3" value from columns list + columns[6], # "Watched_Value4" value from columns list + 'not-processed', # "Status" column (placeholder) + columns[7], # "Extra" value from columns list + 'null', # Placeholder for "UserData" column + columns[8], # "ForeignKey" value from columns list + tmp_SyncHubNodeName # Sync Hub Node name ) - else: - mylog('none', ['[Plugins] Skipped invalid line in the output: ', line]) - else: - mylog('debug', [f'[Plugins] The file {file_path} does not exist']) + ) + else: + mylog('none', ['[Plugins] Skipped invalid line in the output: ', line]) # TODO: delete processed files # os.rename(file_path, os.path.join(file_dir, new_filename)) @@ -430,9 +395,6 @@ def execute_plugin(db, all_plugins, plugin, pluginsState = plugins_state() ): return pluginsState - - - #------------------------------------------------------------------------------- # Check if watched values changed for the given plugin def process_plugin_events(db, plugin, pluginsState, plugEventsArr): diff --git a/server/plugin_utils.py b/server/plugin_utils.py index 8d85bd30..836a16fd 100755 --- a/server/plugin_utils.py +++ b/server/plugin_utils.py @@ -243,4 +243,53 @@ def getPluginObject(keyValues): return {} +# ------------------------------------------------------------------ +# decode any encoded last_result files +def decode_and_rename_files(file_dir, file_prefix): + """ + Decodes and renames files in the specified directory if they are encrypted. + Returns a list of files to be processed and the Sync Hub Node name. + """ + # Initialize the list of files to be processed and Sync Hub Node name + files_to_process = [] + # key to decrypt data if available + encryption_key = get_setting_value('SYNC_encryption_key') + + # Check for files starting with the specified prefix + matching_files = [f for f in os.listdir(file_dir) if f.startswith(file_prefix)] + + for filename in matching_files: + # Create the full file path + file_path = os.path.join(file_dir, filename) + + # Check if the file exists + if os.path.exists(file_path): + + # Check if the file name contains "encoded" + if '.encoded.' in filename and encryption_key: + # Decrypt the entire file + with open(file_path, 'r+') as f: + encrypted_data = f.read() + decrypted_data = decrypt_data(encrypted_data, encryption_key) + + # Write the decrypted data back to the file + f.seek(0) + f.write(decrypted_data) + f.truncate() + + # Rename the file e.g. from last_result.encoded.Node_1.1.log to last_result.decoded.Node_1.1.log + new_filename = filename.replace('.encoded.', '.decoded.') + os.rename(file_path, os.path.join(file_dir, new_filename)) + + files_to_process.append(new_filename) + + elif filename == 'last_result.log': + files_to_process.append(filename) + else: + # Skipping decoded and other files + continue + else: + mylog('debug', [f'[Plugins] The file {file_path} does not exist']) + + return files_to_process \ No newline at end of file