From f780fe9686b64604ff51e0f2cff3c54da61652c6 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Sat, 17 May 2025 10:15:59 +0200 Subject: [PATCH] Fix polling update with gevent instead of threads --- changedetectionio/realtime/socket_server.py | 128 ++++++++++++-------- changedetectionio/static/js/socket.js | 30 ++++- 2 files changed, 102 insertions(+), 56 deletions(-) diff --git a/changedetectionio/realtime/socket_server.py b/changedetectionio/realtime/socket_server.py index b2463c0b..de828340 100644 --- a/changedetectionio/realtime/socket_server.py +++ b/changedetectionio/realtime/socket_server.py @@ -8,6 +8,8 @@ from loguru import logger from blinker import signal from changedetectionio import strtobool +from changedetectionio.flask_app import running_update_threads + class SignalHandler: """A standalone class to receive signals""" @@ -25,6 +27,15 @@ class SignalHandler: queue_length_signal.connect(self.handle_queue_length, weak=False) logger.info("SignalHandler: Connected to queue_length signal") + + # Create and start the queue update thread using gevent + import gevent + logger.info("Using gevent for polling thread") + self.polling_emitter_thread = gevent.spawn(self.polling_emit_running_or_queued_watches) + + # Store the thread reference in socketio for clean shutdown + self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread + def handle_signal(self, *args, **kwargs): logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs") # Safely extract the watch UUID from kwargs @@ -64,6 +75,50 @@ class SignalHandler: logger.error(f"Socket.IO error in handle_queue_length: {str(e)}") + def polling_emit_running_or_queued_watches(self): + """Greenlet that periodically updates the browser/frontend with current state of who is being checked or queued + This is because sometimes the browser page could reload (like on clicking on a link) but the data is old + """ + logger.info("Queue update greenlet started") + + # Import the watch_check_update signal, update_q, and running_update_threads here to avoid circular imports + from changedetectionio.flask_app import app, running_update_threads + watch_check_update = signal('watch_check_update') + + # Use gevent sleep for non-blocking operation + from gevent import sleep as gevent_sleep + + # Get the stop event from the socketio instance + stop_event = self.socketio_instance.stop_event if hasattr(self.socketio_instance, 'stop_event') else None + + # Run until explicitly stopped + while stop_event is None or not stop_event.is_set(): + try: + # For each item in the queue, send a signal + for t in running_update_threads: + if hasattr(t, 'current_uuid') and t.current_uuid: + logger.debug(f"Sending update for {t.current_uuid}") + # Send with app_context to ensure proper URL generation + with app.app_context(): + watch_check_update.send(app_context=app, watch_uuid=t.current_uuid) + # Yield control back to gevent after each send to prevent blocking + gevent_sleep(0.1) # Small sleep to yield control + + # Check if we need to stop in the middle of processing + if stop_event is not None and stop_event.is_set(): + break + + # Sleep between polling/update cycles + gevent_sleep(2) + + except Exception as e: + logger.error(f"Error in queue update greenlet: {str(e)}") + # Sleep a bit to avoid flooding logs in case of persistent error + gevent_sleep(0.5) + + logger.info("Queue update greenlet stopped") + + def handle_watch_update(socketio, **kwargs): """Handle watch update signal from blinker""" try: @@ -87,10 +142,7 @@ def handle_watch_update(socketio, **kwargs): queue_list.append(q_item.item['uuid']) error_texts = "" - # So anything with 'url_for' etc needs to be triggered with app_context of the current app when sending the signal -# with app.app_context(): -# watch_check_update.send(app_context=app, watch_uuid=uuid) - + # Get the error texts from the watch error_texts = watch.compile_error_texts() # Create a simplified watch data object to send to clients @@ -111,42 +163,19 @@ def handle_watch_update(socketio, **kwargs): 'event_timestamp': time.time() } + # Debug what's being emitted + logger.debug(f"Emitting 'watch_update' event for {watch.get('uuid')}, data: {watch_data}") + + # Emit to all clients (no 'broadcast' parameter needed - it's the default behavior) socketio.emit("watch_update", watch_data) - logger.trace(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}") + + # Log after successful emit + logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}") except Exception as e: logger.error(f"Socket.IO error in handle_watch_update: {str(e)}") -def queue_update_thread(socketio, stop_event): - """Thread that periodically sends updates for watches in the queue""" - logger.info("Queue update thread started") - - # Import the watch_check_update signal and update_q here to avoid circular imports - from changedetectionio.flask_app import watch_check_update, update_q - - while not stop_event.is_set(): - try: - # For each item in the queue, send a signal - for q_item in update_q.queue: - if hasattr(q_item, 'item') and 'uuid' in q_item.item: - # Send the signal with the watch_uuid parameter - watch_check_update.send(watch_uuid=q_item.item['uuid']) - logger.debug(f"Queue update thread: Sent update for watch {q_item.item['uuid']}") - - # Sleep for 3 seconds checking every 0.1 to quit - for _ in range(30): # Check stop_event every 0.1 seconds for 3 seconds total - if stop_event.is_set(): - break - time.sleep(0.1) - - except Exception as e: - logger.error(f"Error in queue update thread: {str(e)}") - # Sleep a bit to avoid flooding logs in case of persistent error - time.sleep(0.5) - - logger.info("Queue update thread stopped") - def init_socketio(app, datastore): """Initialize SocketIO with the main Flask app""" # Use the threading async_mode instead of eventlet @@ -206,20 +235,11 @@ def init_socketio(app, datastore): # Store the datastore reference on the socketio object for later use socketio.datastore = datastore - # Create a stop event for our queue update thread - stop_event = threading.Event() + # Create a stop event for our queue update thread using gevent Event + import gevent.event + stop_event = gevent.event.Event() socketio.stop_event = stop_event - - # Create and start the queue update thread - queue_thread = threading.Thread( - target=queue_update_thread, - args=(socketio, stop_event), - daemon=True, - name="QueueUpdateThread" - ) - socketio.queue_thread = queue_thread - queue_thread.start() - logger.info("Queue update thread initialized") + # Add a shutdown method to the socketio object def shutdown(): @@ -232,11 +252,15 @@ def init_socketio(app, datastore): socketio.stop_event.set() logger.info("Socket.IO: Signaled queue update thread to stop") - # Wait for the thread to exit (with timeout) - if hasattr(socketio, 'queue_thread') and socketio.queue_thread.is_alive(): - socketio.queue_thread.join(timeout=5.0) - logger.info("Socket.IO: Queue update thread joined" if not socketio.queue_thread.is_alive() - else "Socket.IO: Queue update thread did not exit in time") + # Wait for the greenlet to exit (with timeout) + if hasattr(socketio, 'polling_emitter_thread'): + try: + # For gevent greenlets + socketio.polling_emitter_thread.join(timeout=5) + logger.info("Socket.IO: Queue update greenlet joined successfully") + except Exception as e: + logger.error(f"Error joining greenlet: {str(e)}") + logger.info("Socket.IO: Queue update greenlet did not exit in time") # Close any remaining client connections #if hasattr(socketio, 'server'): diff --git a/changedetectionio/static/js/socket.js b/changedetectionio/static/js/socket.js index ca3acc53..0193e0e8 100644 --- a/changedetectionio/static/js/socket.js +++ b/changedetectionio/static/js/socket.js @@ -37,12 +37,25 @@ $(document).ready(function () { // Connection status logging socket.on('connect', function () { - console.log('Socket.IO connected'); + console.log('Socket.IO connected with path:', socketio_url); + console.log('Socket transport:', socket.io.engine.transport.name); bindAjaxHandlerButtonsEvents(); }); - socket.on('disconnect', function () { - console.log('Socket.IO disconnected'); + socket.on('connect_error', function(error) { + console.error('Socket.IO connection error:', error); + }); + + socket.on('connect_timeout', function() { + console.error('Socket.IO connection timeout'); + }); + + socket.on('error', function(error) { + console.error('Socket.IO error:', error); + }); + + socket.on('disconnect', function (reason) { + console.log('Socket.IO disconnected, reason:', reason); $('.ajax-op').off('.ajaxHandlerNamespace') }); @@ -52,11 +65,18 @@ $(document).ready(function () { }) // Listen for periodically emitted watch data + // Add an explicit watch_update listener + console.log('Adding watch_update event listener'); socket.on('watch_update', function (watch) { + // Log the entire watch object for debugging + console.log('!!! WATCH UPDATE EVENT RECEIVED !!!'); console.log(`${watch.event_timestamp} - Watch update ${watch.uuid} - Checking now - ${watch.checking_now} - UUID in URL ${window.location.href.includes(watch.uuid)}`); - + console.log('Watch data:', watch); + // Updating watch table rows const $watchRow = $('tr[data-watch-uuid="' + watch.uuid + '"]'); + console.log('Found watch row elements:', $watchRow.length); + if ($watchRow.length) { $($watchRow).toggleClass('checking-now', watch.checking_now); $($watchRow).toggleClass('queued', watch.queued); @@ -72,6 +92,8 @@ $(document).ready(function () { $('td.last-checked .innertext', $watchRow).text(watch.last_checked_text) $('td.last-checked', $watchRow).data('timestamp', watch.last_checked).data('fetchduration', watch.fetch_time); $('td.last-checked', $watchRow).data('eta_complete', watch.last_checked + watch.fetch_time); + + console.log('Updated UI for watch:', watch.uuid); } $('body').toggleClass('checking-now', watch.checking_now && window.location.href.includes(watch.uuid)); });