From 0c919e0edc85a25a39999596eb1f5eec85ad5635 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Wed, 7 May 2025 17:47:11 +0200 Subject: [PATCH] WIP --- .../watchlist/templates/watch-overview.html | 6 +- changedetectionio/flask_app.py | 8 ++ changedetectionio/model/Watch.py | 4 + changedetectionio/realtime/socket_server.py | 127 ++++++++++-------- changedetectionio/static/js/socket.js | 60 ++------- changedetectionio/update_worker.py | 17 ++- requirements.txt | 3 + 7 files changed, 112 insertions(+), 113 deletions(-) diff --git a/changedetectionio/blueprint/watchlist/templates/watch-overview.html b/changedetectionio/blueprint/watchlist/templates/watch-overview.html index f552f2ad..e5e20cc6 100644 --- a/changedetectionio/blueprint/watchlist/templates/watch-overview.html +++ b/changedetectionio/blueprint/watchlist/templates/watch-overview.html @@ -100,14 +100,13 @@ {% endif %} {% for watch in (watches|sort(attribute=sort_attribute, reverse=sort_order == 'asc'))|pagination_slice(skip=pagination.skip) %} - {% set is_unviewed = watch.newest_history_key| int > watch.last_viewed and watch.history_n>=2 %} {% set checking_now = is_checking_now(watch) %} {% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %} + Edit {% if watch.history_n >= 2 %} @@ -214,7 +214,7 @@ {% set open_diff_in_new_tab = datastore.data['settings']['application']['ui'].get('open_diff_in_new_tab') %} {% set target_attr = ' target="' ~ watch.uuid ~ '"' if open_diff_in_new_tab else '' %} - {% if is_unviewed %} + {% if watch.has_unviewed %} History {% else %} History diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 2c31b7fc..f6392936 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -7,6 +7,7 @@ import queue import threading import time import timeago +from blinker import signal from changedetectionio.strtobool import strtobool from threading import Event @@ -28,6 +29,10 @@ from flask_login import current_user from flask_paginate import Pagination, get_page_parameter from flask_restful import abort, Api from flask_cors import CORS + +# Create specific signals for application events +# Make this a global singleton to avoid multiple signal objects +watch_check_completed = signal('watch_check_completed', doc='Signal sent when a watch check is completed') from flask_wtf import CSRFProtect from loguru import logger import eventlet @@ -225,6 +230,9 @@ def changedetection_app(config=None, datastore_o=None): # so far just for read-only via tests, but this will be moved eventually to be the main source # (instead of the global var) app.config['DATASTORE'] = datastore_o + + # Store the signal in the app config to ensure it's accessible everywhere + app.config['WATCH_CHECK_COMPLETED_SIGNAL'] = watch_check_completed login_manager = flask_login.LoginManager(app) login_manager.login_view = 'login' diff --git a/changedetectionio/model/Watch.py b/changedetectionio/model/Watch.py index 86e93983..4d03978b 100644 --- a/changedetectionio/model/Watch.py +++ b/changedetectionio/model/Watch.py @@ -60,6 +60,10 @@ class model(watch_base): return False + @property + def has_unviewed(self): + return int(self.newest_history_key) > int(self['last_viewed']) and self.__history_n >= 2 + def ensure_data_dir_exists(self): if not os.path.isdir(self.watch_data_dir): logger.debug(f"> Creating data dir {self.watch_data_dir}") diff --git a/changedetectionio/realtime/socket_server.py b/changedetectionio/realtime/socket_server.py index d9802d70..d7801154 100644 --- a/changedetectionio/realtime/socket_server.py +++ b/changedetectionio/realtime/socket_server.py @@ -4,10 +4,43 @@ import threading import json import time from loguru import logger +import blinker -from changedetectionio.flask_app import _jinja2_filter_datetime +from changedetectionio.flask_app import _jinja2_filter_datetime, watch_check_completed +class SignalHandler: + """A standalone class to receive signals""" + def __init__(self, socketio_instance): + self.socketio_instance = socketio_instance + + # Get signal from app config + app_signal = socketio_instance.main_app.config.get('WATCH_CHECK_COMPLETED_SIGNAL') + if app_signal: + app_signal.connect(self.handle_signal, weak=False) + logger.info("SignalHandler: Connected to signal from app config") + else: + # Fallback if not in app config + from changedetectionio.flask_app import watch_check_completed as wcc + wcc.connect(self.handle_signal, weak=False) + logger.info("SignalHandler: Connected to signal from direct import") + + def handle_signal(self, *args, **kwargs): + logger.info(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs") + # Safely extract the watch UUID from kwargs + watch_uuid = kwargs.get('watch_uuid') + if watch_uuid: + # Get the datastore from the socket instance + datastore = self.socketio_instance.datastore + # Get the watch object from the datastore + watch = datastore.data['watching'].get(watch_uuid) + if watch: + # Forward to the socket instance with the watch parameter + self.socketio_instance.handle_watch_update(watch=watch) + logger.info(f"Signal handler processed watch UUID {watch_uuid}") + else: + logger.warning(f"Watch UUID {watch_uuid} not found in datastore") + class ChangeDetectionSocketIO: def __init__(self, app, datastore): self.main_app = app @@ -28,76 +61,60 @@ class ChangeDetectionSocketIO: # Just start a background thread to periodically emit watch status self.thread = None self.thread_lock = threading.Lock() + + # Create a dedicated signal handler + self.signal_handler = SignalHandler(self) - def start_background_task(self): - """Start the background task if it's not already running""" - with self.thread_lock: - if self.thread is None: - self.thread = threading.Thread(target=self.background_task) - self.thread.daemon = True - self.thread.start() - logger.info("Socket.IO: Started background task thread") - def handle_connect(self): """Handle client connection""" logger.info("Socket.IO: Client connected") - # Start the background task when the first client connects - self.start_background_task() - + def handle_disconnect(self): """Handle client disconnection""" logger.info("Socket.IO: Client disconnected") - def background_task(self): - """Background task that emits watch status periodically""" - check_interval = 4 # seconds between updates - + def handle_watch_update(self, **kwargs): + """Handle watch update signal from blinker""" try: + watch = kwargs.get('watch') + # Emit the watch update to all connected clients with self.main_app.app_context(): - while True: - try: - # Collect all watch data - watches_data = [] - - # Get list of watches that are currently running - from changedetectionio.flask_app import running_update_threads - currently_checking = [] - - # Make a copy to avoid issues if the list changes - threads_snapshot = list(running_update_threads) - for thread in threads_snapshot: - if hasattr(thread, 'current_uuid') and thread.current_uuid: - currently_checking.append(thread.current_uuid) - self.socketio.emit("checking_now", list(currently_checking)) + from changedetectionio.flask_app import running_update_threads, update_q + + # Get list of watches that are currently running + running_uuids = [] + for t in running_update_threads: + if hasattr(t, 'current_uuid') and t.current_uuid: + running_uuids.append(t.current_uuid) + + # Get list of watches in the queue + queue_list = [] + for q_item in update_q.queue: + if hasattr(q_item, 'item') and 'uuid' in q_item.item: + queue_list.append(q_item.item['uuid']) + + # Create a simplified watch data object to send to clients + watch_data = { + 'uuid': watch.get('uuid'), + 'last_checked_text': _jinja2_filter_datetime(watch), + 'last_checked': watch.get('last_checked'), + 'last_changed': watch.get('last_changed'), + 'queued': True if watch.get('uuid') in queue_list else False, + 'checking_now': True if watch.get('uuid') in running_uuids else False, + 'unviewed': watch.has_unviewed, + } + self.socketio.emit("watch_update", watch_data) + logger.debug(f"Socket.IO: Emitted update for watch {watch.uuid}") - # Send all watch data periodically - for uuid, watch in self.datastore.data['watching'].items(): - # Simplified watch data to avoid sending everything - simplified_data = { - 'uuid': uuid, - 'last_checked': _jinja2_filter_datetime(watch), -# 'history_n': watch.history_n if hasattr(watch, 'history_n') else 0, - } - #watches_data.append(simplified_data) - - # Emit all watch data periodically - self.socketio.emit('watch_data', watches_data) - logger.debug(f"Socket.IO: Emitted watch data for {len(watches_data)} watches") - - except Exception as e: - logger.error(f"Socket.IO error in background task: {str(e)}") - - # Wait before next update - time.sleep(check_interval) - except Exception as e: - logger.error(f"Socket.IO background task failed: {str(e)}") - + logger.error(f"Socket.IO error in handle_watch_update: {str(e)}") + + def run(self, host='0.0.0.0', port=5005): """Run the Socket.IO server on a separate port""" # Start the background task when the server starts - self.start_background_task() + #self.start_background_task() # Run the Socket.IO server # Use 0.0.0.0 to listen on all interfaces diff --git a/changedetectionio/static/js/socket.js b/changedetectionio/static/js/socket.js index 201a9b7c..2b25d7be 100644 --- a/changedetectionio/static/js/socket.js +++ b/changedetectionio/static/js/socket.js @@ -31,60 +31,18 @@ $(document).ready(function() { }); // Listen for periodically emitted watch data - socket.on('watch_data', function(watches) { -/* console.log('Received watch data updates'); + socket.on('watch_update', function(watch) { + console.log(`Watch update ${watch.uuid}`); - - // Update all watches with their current data - watches.forEach(function(watch) { - const $watchRow = $('tr[data-watch-uuid="' + watch.uuid + '"]'); - if ($watchRow.length) { - updateWatchRow($watchRow, watch); - } - });*/ - }); - - // Function to update a watch row with new data - function updateWatchRow($row, data) { - // Update the last-checked time - return; - const $lastChecked = $row.find('.last-checked'); - if ($lastChecked.length) { - // Update data-timestamp attribute - $lastChecked.attr('data-timestamp', data.last_checked); - - // Only show timeago if not currently checking - if (!data.checking) { - let $timeagoSpan = $lastChecked.find('.timeago'); - - // If there's no timeago span yet, create one - if (!$timeagoSpan.length) { - $lastChecked.html(''); - $timeagoSpan = $lastChecked.find('.timeago'); - } - - if (data.last_checked > 0) { - // Format as timeago if we have the timeago library available - if (typeof timeago !== 'undefined') { - $timeagoSpan.text(timeago.format(data.last_checked * 1000)); - } else { - // Simple fallback if timeago isn't available - const date = new Date(data.last_checked * 1000); - $timeagoSpan.text(date.toLocaleString()); - } - } else { - $lastChecked.text('Not yet'); - } - } + + const $watchRow = $('tr[data-watch-uuid="' + watch.uuid + '"]'); + if ($watchRow.length) { + $($watchRow).toggleClass('checking-now', watch.checking_now); + $($watchRow).toggleClass('queued', watch.queued); + $($watchRow).toggleClass('unviewed', watch.unviewed); } + }); - - // Toggle the unviewed class based on viewed status -// $row.toggleClass('unviewed', data.unviewed_history === false); - - // If the watch is currently being checked - // $row.toggleClass('checking-now', data.checking === true); - } } catch (e) { // If Socket.IO fails to initialize, just log it and continue console.log('Socket.IO initialization error:', e); diff --git a/changedetectionio/update_worker.py b/changedetectionio/update_worker.py index 6f05b7f7..868b9040 100644 --- a/changedetectionio/update_worker.py +++ b/changedetectionio/update_worker.py @@ -2,6 +2,7 @@ from .processors.exceptions import ProcessorException import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse from changedetectionio import html_tools +from changedetectionio.flask_app import watch_check_completed import importlib import os @@ -242,17 +243,16 @@ class update_worker(threading.Thread): os.unlink(full_path) def run(self): - + while not self.app.config.exit.is_set(): update_handler = None + watch = None try: queued_item_data = self.q.get(block=False) except queue.Empty: pass - else: - uuid = queued_item_data.item.get('uuid') fetch_start_time = round(time.time()) # Also used for a unique history key for now self.current_uuid = uuid @@ -272,6 +272,9 @@ class update_worker(threading.Thread): logger.info(f"Processing watch UUID {uuid} Priority {queued_item_data.priority} URL {watch['url']}") try: + #watch_check_completed.send(sender=self, watch=watch) + watch_check_completed.send(watch_uuid=watch['uuid']) + # Processor is what we are using for detecting the "Change" processor = watch.get('processor', 'text_json_diff') @@ -588,12 +591,18 @@ class update_worker(threading.Thread): 'check_count': count }) - self.current_uuid = None # Done self.q.task_done() + + # Send signal for watch check completion with the watch data + if watch: + logger.info(f"Sending watch_check_completed signal for UUID {watch['uuid']}") + watch_check_completed.send(watch_uuid=watch['uuid']) + update_handler = None logger.debug(f"Watch {uuid} done in {time.time()-fetch_start_time:.2f}s") + # Give the CPU time to interrupt time.sleep(0.1) diff --git a/requirements.txt b/requirements.txt index 907640c5..5f025950 100644 --- a/requirements.txt +++ b/requirements.txt @@ -118,3 +118,6 @@ psutil==7.0.0 ruff >= 0.11.2 pre_commit >= 4.2.0 + +# For events between checking and socketio updates +blinker