This commit is contained in:
dgtlmoon
2025-05-15 19:19:26 +02:00
parent 391fa9a807
commit cc484f07be
4 changed files with 159 additions and 7 deletions

View File

@@ -31,6 +31,16 @@ def sigshutdown_handler(_signo, _stack_frame):
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Saving DB to disk and calling shutdown')
datastore.sync_to_json()
logger.success('Sync JSON to disk complete.')
# Shutdown socketio server if available
from changedetectionio.flask_app import socketio_server
if socketio_server and hasattr(socketio_server, 'shutdown'):
try:
logger.info("Shutting down Socket.IO server...")
socketio_server.shutdown()
except Exception as e:
logger.error(f"Error shutting down Socket.IO server: {str(e)}")
# Set flags for clean shutdown
datastore.stop_thread = True
app.config.exit.set()

View File

@@ -1,5 +1,6 @@
import queue
from blinker import signal
from loguru import logger
class SignalPriorityQueue(queue.PriorityQueue):
"""
@@ -12,7 +13,11 @@ class SignalPriorityQueue(queue.PriorityQueue):
def __init__(self, maxsize=0):
super().__init__(maxsize)
try:
self.queue_length_signal = signal('queue_length')
except Exception as e:
logger.critical(f"Exception: {e}")
def put(self, item, block=True, timeout=None):
# Call the parent's put method first
super().put(item, block, timeout)
@@ -24,4 +29,24 @@ class SignalPriorityQueue(queue.PriorityQueue):
watch_check_update = signal('watch_check_update')
if watch_check_update:
# Send the watch_uuid parameter
watch_check_update.send(watch_uuid=uuid)
watch_check_update.send(watch_uuid=uuid)
# Send queue_length signal with current queue size
try:
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"Exception: {e}")
def get(self, block=True, timeout=None):
# Call the parent's get method first
item = super().get(block, timeout)
# Send queue_length signal with current queue size
try:
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"Exception: {e}")
return item

View File

@@ -3,7 +3,12 @@ from flask_socketio import SocketIO
import time
import os
import threading
from loguru import logger
from blinker import signal
from changedetectionio import strtobool
class SignalHandler:
"""A standalone class to receive signals"""
@@ -15,6 +20,11 @@ class SignalHandler:
from changedetectionio.flask_app import watch_check_update as wcc
wcc.connect(self.handle_signal, weak=False)
logger.info("SignalHandler: Connected to signal from direct import")
# Connect to the queue_length signal
queue_length_signal = signal('queue_length')
queue_length_signal.connect(self.handle_queue_length, weak=False)
logger.info("SignalHandler: Connected to queue_length signal")
def handle_signal(self, *args, **kwargs):
logger.info(f"SignalHandler: Signal received with {len(args)} args and {len(kwargs)} kwargs")
@@ -29,6 +39,21 @@ class SignalHandler:
logger.info(f"Signal handler processed watch UUID {watch_uuid}")
else:
logger.warning(f"Watch UUID {watch_uuid} not found in datastore")
def handle_queue_length(self, *args, **kwargs):
"""Handle queue_length signal and emit to all clients"""
try:
queue_length = kwargs.get('length', 0)
logger.debug(f"SignalHandler: Queue length update received: {queue_length}")
# Emit the queue size to all connected clients
self.socketio_instance.emit("queue_size", {
"q_length": queue_length,
"event_timestamp": time.time()
})
except Exception as e:
logger.error(f"Socket.IO error in handle_queue_length: {str(e)}")
def handle_watch_update(socketio, **kwargs):
@@ -78,18 +103,51 @@ def handle_watch_update(socketio, **kwargs):
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
def queue_update_thread(socketio, stop_event):
"""Thread that periodically sends updates for watches in the queue"""
logger.info("Queue update thread started")
# Import the watch_check_update signal and update_q here to avoid circular imports
from changedetectionio.flask_app import watch_check_update, update_q
while not stop_event.is_set():
try:
# For each item in the queue, send a signal
for q_item in update_q.queue:
if hasattr(q_item, 'item') and 'uuid' in q_item.item:
# Send the signal with the watch_uuid parameter
watch_check_update.send(watch_uuid=q_item.item['uuid'])
logger.debug(f"Queue update thread: Sent update for watch {q_item.item['uuid']}")
# Sleep for 3 seconds checking every 0.1 to quit
for _ in range(30): # Check stop_event every 0.1 seconds for 3 seconds total
if stop_event.is_set():
break
time.sleep(0.1)
except Exception as e:
logger.error(f"Error in queue update thread: {str(e)}")
# Sleep a bit to avoid flooding logs in case of persistent error
time.sleep(0.5)
logger.info("Queue update thread stopped")
def init_socketio(app, datastore):
"""Initialize SocketIO with the main Flask app"""
# Use the threading async_mode instead of eventlet
# This avoids the need for monkey patching
# This avoids the need for monkey patching eventlet,
# Which leads to problems with async playwright etc
async_mode = 'threading'
logger.info(f"Using {async_mode} mode for Socket.IO")
# Restrict SocketIO CORS to same origin by default, can be overridden with env var
cors_origins = os.environ.get('SOCKETIO_CORS_ORIGINS', None)
socketio = SocketIO(app,
async_mode=async_mode,
cors_allowed_origins="*",
logger=True,
engineio_logger=True)
cors_allowed_origins=cors_origins, # None means same-origin only
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'True')),
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'True')))
# Set up event handlers
@socketio.on('connect')
@@ -98,6 +156,7 @@ def init_socketio(app, datastore):
from changedetectionio.auth_decorator import login_optionally_required
from flask import request
from flask_login import current_user
from changedetectionio.flask_app import update_q
# Access datastore from socketio
datastore = socketio.datastore
@@ -108,6 +167,17 @@ def init_socketio(app, datastore):
logger.warning("Socket.IO: Rejecting unauthenticated connection")
return False # Reject the connection
# Send the current queue size to the newly connected client
try:
queue_size = update_q.qsize()
socketio.emit("queue_size", {
"q_length": queue_size,
"event_timestamp": time.time()
}, room=request.sid) # Send only to this client
logger.debug(f"Socket.IO: Sent initial queue size {queue_size} to new client")
except Exception as e:
logger.error(f"Socket.IO error sending initial queue size: {str(e)}")
logger.info("Socket.IO: Client connected")
@socketio.on('disconnect')
@@ -120,6 +190,48 @@ def init_socketio(app, datastore):
# Store the datastore reference on the socketio object for later use
socketio.datastore = datastore
# Create a stop event for our queue update thread
stop_event = threading.Event()
socketio.stop_event = stop_event
# Create and start the queue update thread
queue_thread = threading.Thread(
target=queue_update_thread,
args=(socketio, stop_event),
daemon=True,
name="QueueUpdateThread"
)
socketio.queue_thread = queue_thread
queue_thread.start()
logger.info("Queue update thread initialized")
# Add a shutdown method to the socketio object
def shutdown():
"""Shutdown the SocketIO server gracefully"""
try:
logger.info("Socket.IO: Shutting down server...")
# Signal the queue update thread to stop
if hasattr(socketio, 'stop_event'):
socketio.stop_event.set()
logger.info("Socket.IO: Signaled queue update thread to stop")
# Wait for the thread to exit (with timeout)
if hasattr(socketio, 'queue_thread') and socketio.queue_thread.is_alive():
socketio.queue_thread.join(timeout=5.0)
logger.info("Socket.IO: Queue update thread joined" if not socketio.queue_thread.is_alive()
else "Socket.IO: Queue update thread did not exit in time")
# Close any remaining client connections
if hasattr(socketio, 'server'):
socketio.server.disconnect()
logger.info("Socket.IO: Server shutdown complete")
except Exception as e:
logger.error(f"Socket.IO error during shutdown: {str(e)}")
# Attach the shutdown method to the socketio object
socketio.shutdown = shutdown
logger.info("Socket.IO initialized and attached to main Flask app")
return socketio
return socketio

View File

@@ -41,6 +41,11 @@ $(document).ready(function () {
console.log('Socket.IO disconnected');
});
socket.on('queue_size', function (data) {
console.log(`${data.event_timestamp} - Queue size update: ${data.q_length}`);
// Update queue size display if implemented in the UI
})
// Listen for periodically emitted watch data
socket.on('watch_update', function (watch) {
console.log(`${watch.event_timestamp} - Watch update ${watch.uuid} - Checking now - ${watch.checking_now} - UUID in URL ${window.location.href.includes(watch.uuid)}`);