diff --git a/changedetectionio/tests/test_history_consistency.py b/changedetectionio/tests/test_history_consistency.py index 997342fd..e260418a 100644 --- a/changedetectionio/tests/test_history_consistency.py +++ b/changedetectionio/tests/test_history_consistency.py @@ -50,6 +50,9 @@ def test_consistent_history(client, live_server, measure_memory_usage): i=0 for w in json_obj['watching'].keys(): i+1 +# res = client.get(url_for("watchlist.index")) +# with open('/tmp/debug.html', 'wb') as f: +# f.write(res.data) history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt') assert os.path.isfile(history_txt_index_file), f"history.txt for i: {i} should exist where I expect it at {history_txt_index_file}" diff --git a/changedetectionio/tests/util.py b/changedetectionio/tests/util.py index cf6e2da3..da255089 100644 --- a/changedetectionio/tests/util.py +++ b/changedetectionio/tests/util.py @@ -127,22 +127,49 @@ def extract_UUID_from_client(client): return uuid.strip() def wait_for_all_checks(client=None): - # actually this is not entirely true, it can still be 'processing' but not in the queue - # Loop waiting until done.. - attempt=0 - # because sub-second rechecks are problematic in testing, use lots of delays - time.sleep(1.2) - - from changedetectionio.flask_app import update_q as global_update_q + """ + Waits until the queue is empty and remains empty for at least `required_empty_duration` seconds, + and also ensures no running threads have `current_uuid` set. + Retries for up to `max_attempts` times, sleeping `wait_between_attempts` seconds between checks. + """ + from changedetectionio.flask_app import update_q as global_update_q, running_update_threads - while attempt < 60: - # Get queue size safely - update_q is a SignalPriorityQueue + # Configuration + attempt = 0 + max_attempts = 60 + wait_between_attempts = 2 + required_empty_duration = 2 + + logger = logging.getLogger() + time.sleep(1.2) + + empty_since = None + + while attempt < max_attempts: q_length = global_update_q.qsize() - if not q_length: - logging.getLogger().info(f"queue empty at attempt {attempt}") - break - logging.getLogger().info(f"Waiting for empty queue.... {attempt}") - time.sleep(2) + + # Check if any threads are still processing + time.sleep(1.2) + any_threads_busy = any(t.current_uuid for t in running_update_threads) + + + if q_length == 0 and not any_threads_busy: + if empty_since is None: + empty_since = time.time() + logger.info(f"Queue empty and no active threads at attempt {attempt}, starting empty timer...") + elif time.time() - empty_since >= required_empty_duration: + logger.info(f"Queue has been empty and threads idle for {required_empty_duration} seconds. Done waiting.") + break + else: + logger.info(f"Still waiting: queue empty and no active threads, but not yet {required_empty_duration} seconds...") + else: + if q_length != 0: + logger.info(f"Queue not empty (size={q_length}), resetting timer.") + if any_threads_busy: + busy_threads = [t.name for t in running_update_threads if t.current_uuid] + logger.info(f"Threads still busy: {busy_threads}, resetting timer.") + empty_since = None + attempt += 1 time.sleep(1.2)