Fix polling update with gevent instead of threads
This commit is contained in:
@@ -8,6 +8,8 @@ from loguru import logger
|
||||
from blinker import signal
|
||||
|
||||
from changedetectionio import strtobool
|
||||
from changedetectionio.flask_app import running_update_threads
|
||||
|
||||
|
||||
class SignalHandler:
|
||||
"""A standalone class to receive signals"""
|
||||
@@ -25,6 +27,15 @@ class SignalHandler:
|
||||
queue_length_signal.connect(self.handle_queue_length, weak=False)
|
||||
logger.info("SignalHandler: Connected to queue_length signal")
|
||||
|
||||
|
||||
# Create and start the queue update thread using gevent
|
||||
import gevent
|
||||
logger.info("Using gevent for polling thread")
|
||||
self.polling_emitter_thread = gevent.spawn(self.polling_emit_running_or_queued_watches)
|
||||
|
||||
# Store the thread reference in socketio for clean shutdown
|
||||
self.socketio_instance.polling_emitter_thread = self.polling_emitter_thread
|
||||
|
||||
def handle_signal(self, *args, **kwargs):
|
||||
logger.trace(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
|
||||
# Safely extract the watch UUID from kwargs
|
||||
@@ -64,6 +75,50 @@ class SignalHandler:
|
||||
logger.error(f"Socket.IO error in handle_queue_length: {str(e)}")
|
||||
|
||||
|
||||
def polling_emit_running_or_queued_watches(self):
|
||||
"""Greenlet that periodically updates the browser/frontend with current state of who is being checked or queued
|
||||
This is because sometimes the browser page could reload (like on clicking on a link) but the data is old
|
||||
"""
|
||||
logger.info("Queue update greenlet started")
|
||||
|
||||
# Import the watch_check_update signal, update_q, and running_update_threads here to avoid circular imports
|
||||
from changedetectionio.flask_app import app, running_update_threads
|
||||
watch_check_update = signal('watch_check_update')
|
||||
|
||||
# Use gevent sleep for non-blocking operation
|
||||
from gevent import sleep as gevent_sleep
|
||||
|
||||
# Get the stop event from the socketio instance
|
||||
stop_event = self.socketio_instance.stop_event if hasattr(self.socketio_instance, 'stop_event') else None
|
||||
|
||||
# Run until explicitly stopped
|
||||
while stop_event is None or not stop_event.is_set():
|
||||
try:
|
||||
# For each item in the queue, send a signal
|
||||
for t in running_update_threads:
|
||||
if hasattr(t, 'current_uuid') and t.current_uuid:
|
||||
logger.debug(f"Sending update for {t.current_uuid}")
|
||||
# Send with app_context to ensure proper URL generation
|
||||
with app.app_context():
|
||||
watch_check_update.send(app_context=app, watch_uuid=t.current_uuid)
|
||||
# Yield control back to gevent after each send to prevent blocking
|
||||
gevent_sleep(0.1) # Small sleep to yield control
|
||||
|
||||
# Check if we need to stop in the middle of processing
|
||||
if stop_event is not None and stop_event.is_set():
|
||||
break
|
||||
|
||||
# Sleep between polling/update cycles
|
||||
gevent_sleep(2)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in queue update greenlet: {str(e)}")
|
||||
# Sleep a bit to avoid flooding logs in case of persistent error
|
||||
gevent_sleep(0.5)
|
||||
|
||||
logger.info("Queue update greenlet stopped")
|
||||
|
||||
|
||||
def handle_watch_update(socketio, **kwargs):
|
||||
"""Handle watch update signal from blinker"""
|
||||
try:
|
||||
@@ -87,10 +142,7 @@ def handle_watch_update(socketio, **kwargs):
|
||||
queue_list.append(q_item.item['uuid'])
|
||||
|
||||
error_texts = ""
|
||||
# So anything with 'url_for' etc needs to be triggered with app_context of the current app when sending the signal
|
||||
# with app.app_context():
|
||||
# watch_check_update.send(app_context=app, watch_uuid=uuid)
|
||||
|
||||
# Get the error texts from the watch
|
||||
error_texts = watch.compile_error_texts()
|
||||
|
||||
# Create a simplified watch data object to send to clients
|
||||
@@ -111,42 +163,19 @@ def handle_watch_update(socketio, **kwargs):
|
||||
'event_timestamp': time.time()
|
||||
}
|
||||
|
||||
# Debug what's being emitted
|
||||
logger.debug(f"Emitting 'watch_update' event for {watch.get('uuid')}, data: {watch_data}")
|
||||
|
||||
# Emit to all clients (no 'broadcast' parameter needed - it's the default behavior)
|
||||
socketio.emit("watch_update", watch_data)
|
||||
logger.trace(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}")
|
||||
|
||||
# Log after successful emit
|
||||
logger.info(f"Socket.IO: Emitted update for watch {watch.get('uuid')}, Checking now: {watch_data['checking_now']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
|
||||
|
||||
|
||||
def queue_update_thread(socketio, stop_event):
|
||||
"""Thread that periodically sends updates for watches in the queue"""
|
||||
logger.info("Queue update thread started")
|
||||
|
||||
# Import the watch_check_update signal and update_q here to avoid circular imports
|
||||
from changedetectionio.flask_app import watch_check_update, update_q
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
# For each item in the queue, send a signal
|
||||
for q_item in update_q.queue:
|
||||
if hasattr(q_item, 'item') and 'uuid' in q_item.item:
|
||||
# Send the signal with the watch_uuid parameter
|
||||
watch_check_update.send(watch_uuid=q_item.item['uuid'])
|
||||
logger.debug(f"Queue update thread: Sent update for watch {q_item.item['uuid']}")
|
||||
|
||||
# Sleep for 3 seconds checking every 0.1 to quit
|
||||
for _ in range(30): # Check stop_event every 0.1 seconds for 3 seconds total
|
||||
if stop_event.is_set():
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in queue update thread: {str(e)}")
|
||||
# Sleep a bit to avoid flooding logs in case of persistent error
|
||||
time.sleep(0.5)
|
||||
|
||||
logger.info("Queue update thread stopped")
|
||||
|
||||
def init_socketio(app, datastore):
|
||||
"""Initialize SocketIO with the main Flask app"""
|
||||
# Use the threading async_mode instead of eventlet
|
||||
@@ -206,20 +235,11 @@ def init_socketio(app, datastore):
|
||||
# Store the datastore reference on the socketio object for later use
|
||||
socketio.datastore = datastore
|
||||
|
||||
# Create a stop event for our queue update thread
|
||||
stop_event = threading.Event()
|
||||
# Create a stop event for our queue update thread using gevent Event
|
||||
import gevent.event
|
||||
stop_event = gevent.event.Event()
|
||||
socketio.stop_event = stop_event
|
||||
|
||||
# Create and start the queue update thread
|
||||
queue_thread = threading.Thread(
|
||||
target=queue_update_thread,
|
||||
args=(socketio, stop_event),
|
||||
daemon=True,
|
||||
name="QueueUpdateThread"
|
||||
)
|
||||
socketio.queue_thread = queue_thread
|
||||
queue_thread.start()
|
||||
logger.info("Queue update thread initialized")
|
||||
|
||||
# Add a shutdown method to the socketio object
|
||||
def shutdown():
|
||||
@@ -232,11 +252,15 @@ def init_socketio(app, datastore):
|
||||
socketio.stop_event.set()
|
||||
logger.info("Socket.IO: Signaled queue update thread to stop")
|
||||
|
||||
# Wait for the thread to exit (with timeout)
|
||||
if hasattr(socketio, 'queue_thread') and socketio.queue_thread.is_alive():
|
||||
socketio.queue_thread.join(timeout=5.0)
|
||||
logger.info("Socket.IO: Queue update thread joined" if not socketio.queue_thread.is_alive()
|
||||
else "Socket.IO: Queue update thread did not exit in time")
|
||||
# Wait for the greenlet to exit (with timeout)
|
||||
if hasattr(socketio, 'polling_emitter_thread'):
|
||||
try:
|
||||
# For gevent greenlets
|
||||
socketio.polling_emitter_thread.join(timeout=5)
|
||||
logger.info("Socket.IO: Queue update greenlet joined successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Error joining greenlet: {str(e)}")
|
||||
logger.info("Socket.IO: Queue update greenlet did not exit in time")
|
||||
|
||||
# Close any remaining client connections
|
||||
#if hasattr(socketio, 'server'):
|
||||
|
||||
@@ -37,12 +37,25 @@ $(document).ready(function () {
|
||||
|
||||
// Connection status logging
|
||||
socket.on('connect', function () {
|
||||
console.log('Socket.IO connected');
|
||||
console.log('Socket.IO connected with path:', socketio_url);
|
||||
console.log('Socket transport:', socket.io.engine.transport.name);
|
||||
bindAjaxHandlerButtonsEvents();
|
||||
});
|
||||
|
||||
socket.on('disconnect', function () {
|
||||
console.log('Socket.IO disconnected');
|
||||
socket.on('connect_error', function(error) {
|
||||
console.error('Socket.IO connection error:', error);
|
||||
});
|
||||
|
||||
socket.on('connect_timeout', function() {
|
||||
console.error('Socket.IO connection timeout');
|
||||
});
|
||||
|
||||
socket.on('error', function(error) {
|
||||
console.error('Socket.IO error:', error);
|
||||
});
|
||||
|
||||
socket.on('disconnect', function (reason) {
|
||||
console.log('Socket.IO disconnected, reason:', reason);
|
||||
$('.ajax-op').off('.ajaxHandlerNamespace')
|
||||
});
|
||||
|
||||
@@ -52,11 +65,18 @@ $(document).ready(function () {
|
||||
})
|
||||
|
||||
// Listen for periodically emitted watch data
|
||||
// Add an explicit watch_update listener
|
||||
console.log('Adding watch_update event listener');
|
||||
socket.on('watch_update', function (watch) {
|
||||
// Log the entire watch object for debugging
|
||||
console.log('!!! WATCH UPDATE EVENT RECEIVED !!!');
|
||||
console.log(`${watch.event_timestamp} - Watch update ${watch.uuid} - Checking now - ${watch.checking_now} - UUID in URL ${window.location.href.includes(watch.uuid)}`);
|
||||
console.log('Watch data:', watch);
|
||||
|
||||
// Updating watch table rows
|
||||
const $watchRow = $('tr[data-watch-uuid="' + watch.uuid + '"]');
|
||||
console.log('Found watch row elements:', $watchRow.length);
|
||||
|
||||
if ($watchRow.length) {
|
||||
$($watchRow).toggleClass('checking-now', watch.checking_now);
|
||||
$($watchRow).toggleClass('queued', watch.queued);
|
||||
@@ -72,6 +92,8 @@ $(document).ready(function () {
|
||||
$('td.last-checked .innertext', $watchRow).text(watch.last_checked_text)
|
||||
$('td.last-checked', $watchRow).data('timestamp', watch.last_checked).data('fetchduration', watch.fetch_time);
|
||||
$('td.last-checked', $watchRow).data('eta_complete', watch.last_checked + watch.fetch_time);
|
||||
|
||||
console.log('Updated UI for watch:', watch.uuid);
|
||||
}
|
||||
$('body').toggleClass('checking-now', watch.checking_now && window.location.href.includes(watch.uuid));
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user