Compare commits
14 Commits
0.47.04
...
minor-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
30515c0e9f | ||
|
|
4bda1a234f | ||
|
|
d297850539 | ||
|
|
751239250f | ||
|
|
6aceeb01ab | ||
|
|
49bc982c69 | ||
|
|
e0abf0b505 | ||
|
|
f08a1185aa | ||
|
|
ad5d7efbbf | ||
|
|
7029d10f8b | ||
|
|
26d3a23e05 | ||
|
|
942625e1fb | ||
|
|
33c83230a6 | ||
|
|
87510becb5 |
4
.github/ISSUE_TEMPLATE/bug_report.md
vendored
4
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -27,6 +27,10 @@ A clear and concise description of what the bug is.
|
||||
**Version**
|
||||
*Exact version* in the top right area: 0....
|
||||
|
||||
**How did you install?**
|
||||
|
||||
Docker, Pip, from source directly etc
|
||||
|
||||
**To Reproduce**
|
||||
|
||||
Steps to reproduce the behavior:
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.47.04'
|
||||
__version__ = '0.47.06'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
164
changedetectionio/blueprint/backups/__init__.py
Normal file
164
changedetectionio/blueprint/backups/__init__.py
Normal file
@@ -0,0 +1,164 @@
|
||||
import datetime
|
||||
import glob
|
||||
import threading
|
||||
|
||||
from flask import Blueprint, render_template, send_from_directory, flash, url_for, redirect, abort
|
||||
import os
|
||||
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.flask_app import login_optionally_required
|
||||
from loguru import logger
|
||||
|
||||
BACKUP_FILENAME_FORMAT = "changedetection-backup-{}.zip"
|
||||
|
||||
|
||||
def create_backup(datastore_path, watches: dict):
|
||||
logger.debug("Creating backup...")
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
# create a ZipFile object
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
backupname = BACKUP_FILENAME_FORMAT.format(timestamp)
|
||||
backup_filepath = os.path.join(datastore_path, backupname)
|
||||
|
||||
with zipfile.ZipFile(backup_filepath.replace('.zip', '.tmp'), "w",
|
||||
compression=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8) as zipObj:
|
||||
|
||||
# Add the index
|
||||
zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json")
|
||||
|
||||
# Add the flask app secret
|
||||
zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt")
|
||||
|
||||
# Add any data in the watch data directory.
|
||||
for uuid, w in watches.items():
|
||||
for f in Path(w.watch_data_dir).glob('*'):
|
||||
zipObj.write(f,
|
||||
# Use the full path to access the file, but make the file 'relative' in the Zip.
|
||||
arcname=os.path.join(f.parts[-2], f.parts[-1]),
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8)
|
||||
|
||||
# Create a list file with just the URLs, so it's easier to port somewhere else in the future
|
||||
list_file = "url-list.txt"
|
||||
with open(os.path.join(datastore_path, list_file), "w") as f:
|
||||
for uuid in watches:
|
||||
url = watches[uuid]["url"]
|
||||
f.write("{}\r\n".format(url))
|
||||
list_with_tags_file = "url-list-with-tags.txt"
|
||||
with open(
|
||||
os.path.join(datastore_path, list_with_tags_file), "w"
|
||||
) as f:
|
||||
for uuid in watches:
|
||||
url = watches[uuid].get('url')
|
||||
tag = watches[uuid].get('tags', {})
|
||||
f.write("{} {}\r\n".format(url, tag))
|
||||
|
||||
# Add it to the Zip
|
||||
zipObj.write(
|
||||
os.path.join(datastore_path, list_file),
|
||||
arcname=list_file,
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8,
|
||||
)
|
||||
zipObj.write(
|
||||
os.path.join(datastore_path, list_with_tags_file),
|
||||
arcname=list_with_tags_file,
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8,
|
||||
)
|
||||
|
||||
# Now it's done, rename it so it shows up finally and its completed being written.
|
||||
os.rename(backup_filepath.replace('.zip', '.tmp'), backup_filepath.replace('.tmp', '.zip'))
|
||||
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
backups_blueprint = Blueprint('backups', __name__, template_folder="templates")
|
||||
backup_threads = []
|
||||
|
||||
@login_optionally_required
|
||||
@backups_blueprint.route("/request-backup", methods=['GET'])
|
||||
def request_backup():
|
||||
if any(thread.is_alive() for thread in backup_threads):
|
||||
flash("A backup is already running, check back in a few minutes", "error")
|
||||
return redirect(url_for('backups.index'))
|
||||
|
||||
if len(find_backups()) > int(os.getenv("MAX_NUMBER_BACKUPS", 100)):
|
||||
flash("Maximum number of backups reached, please remove some", "error")
|
||||
return redirect(url_for('backups.index'))
|
||||
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
|
||||
zip_thread.start()
|
||||
backup_threads.append(zip_thread)
|
||||
flash("Backup building in background, check back in a few minutes.")
|
||||
|
||||
return redirect(url_for('backups.index'))
|
||||
|
||||
def find_backups():
|
||||
backup_filepath = os.path.join(datastore.datastore_path, BACKUP_FILENAME_FORMAT.format("*"))
|
||||
backups = glob.glob(backup_filepath)
|
||||
backup_info = []
|
||||
|
||||
for backup in backups:
|
||||
size = os.path.getsize(backup) / (1024 * 1024)
|
||||
creation_time = os.path.getctime(backup)
|
||||
backup_info.append({
|
||||
'filename': os.path.basename(backup),
|
||||
'filesize': f"{size:.2f}",
|
||||
'creation_time': creation_time
|
||||
})
|
||||
|
||||
backup_info.sort(key=lambda x: x['creation_time'], reverse=True)
|
||||
|
||||
return backup_info
|
||||
|
||||
@login_optionally_required
|
||||
@backups_blueprint.route("/download/<string:filename>", methods=['GET'])
|
||||
def download_backup(filename):
|
||||
import re
|
||||
filename = filename.strip()
|
||||
backup_filename_regex = BACKUP_FILENAME_FORMAT.format("\d+")
|
||||
|
||||
full_path = os.path.join(os.path.abspath(datastore.datastore_path), filename)
|
||||
if not full_path.startswith(os.path.abspath(datastore.datastore_path)):
|
||||
abort(404)
|
||||
|
||||
if filename == 'latest':
|
||||
backups = find_backups()
|
||||
filename = backups[0]['filename']
|
||||
|
||||
if not re.match(r"^" + backup_filename_regex + "$", filename):
|
||||
abort(400) # Bad Request if the filename doesn't match the pattern
|
||||
|
||||
logger.debug(f"Backup download request for '{full_path}'")
|
||||
return send_from_directory(os.path.abspath(datastore.datastore_path), filename, as_attachment=True)
|
||||
|
||||
@login_optionally_required
|
||||
@backups_blueprint.route("/", methods=['GET'])
|
||||
def index():
|
||||
backups = find_backups()
|
||||
output = render_template("overview.html",
|
||||
available_backups=backups,
|
||||
backup_running=any(thread.is_alive() for thread in backup_threads)
|
||||
)
|
||||
|
||||
return output
|
||||
|
||||
@login_optionally_required
|
||||
@backups_blueprint.route("/remove-backups", methods=['GET'])
|
||||
def remove_backups():
|
||||
|
||||
backup_filepath = os.path.join(datastore.datastore_path, BACKUP_FILENAME_FORMAT.format("*"))
|
||||
backups = glob.glob(backup_filepath)
|
||||
for backup in backups:
|
||||
os.unlink(backup)
|
||||
|
||||
flash("Backups were deleted.")
|
||||
|
||||
return redirect(url_for('backups.index'))
|
||||
|
||||
return backups_blueprint
|
||||
36
changedetectionio/blueprint/backups/templates/overview.html
Normal file
36
changedetectionio/blueprint/backups/templates/overview.html
Normal file
@@ -0,0 +1,36 @@
|
||||
{% extends 'base.html' %}
|
||||
{% block content %}
|
||||
{% from '_helpers.html' import render_simple_field, render_field %}
|
||||
<div class="edit-form">
|
||||
<div class="box-wrap inner">
|
||||
<h4>Backups</h4>
|
||||
{% if backup_running %}
|
||||
<p>
|
||||
<strong>A backup is running!</strong>
|
||||
</p>
|
||||
{% endif %}
|
||||
<p>
|
||||
Here you can download and request a new backup, when a backup is completed you will see it listed below.
|
||||
</p>
|
||||
<br>
|
||||
{% if available_backups %}
|
||||
<ul>
|
||||
{% for backup in available_backups %}
|
||||
<li><a href="{{ url_for('backups.download_backup', filename=backup["filename"]) }}">{{ backup["filename"] }}</a> {{ backup["filesize"] }} Mb</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% else %}
|
||||
<p>
|
||||
<strong>No backups found.</strong>
|
||||
</p>
|
||||
{% endif %}
|
||||
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('backups.request_backup') }}">Create backup</a>
|
||||
{% if available_backups %}
|
||||
<a class="pure-button button-small button-error " href="{{ url_for('backups.remove_backups') }}">Remove backups</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
{% endblock %}
|
||||
@@ -53,6 +53,7 @@ extra_stylesheets = []
|
||||
|
||||
update_q = queue.PriorityQueue()
|
||||
notification_q = queue.Queue()
|
||||
MAX_QUEUE_SIZE = 2000
|
||||
|
||||
app = Flask(__name__,
|
||||
static_url_path="",
|
||||
@@ -1236,78 +1237,6 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
return output
|
||||
|
||||
# We're good but backups are even better!
|
||||
@app.route("/backup", methods=['GET'])
|
||||
@login_optionally_required
|
||||
def get_backup():
|
||||
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
|
||||
# Remove any existing backup file, for now we just keep one file
|
||||
|
||||
for previous_backup_filename in Path(datastore_o.datastore_path).rglob('changedetection-backup-*.zip'):
|
||||
os.unlink(previous_backup_filename)
|
||||
|
||||
# create a ZipFile object
|
||||
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
backupname = "changedetection-backup-{}.zip".format(timestamp)
|
||||
backup_filepath = os.path.join(datastore_o.datastore_path, backupname)
|
||||
|
||||
with zipfile.ZipFile(backup_filepath, "w",
|
||||
compression=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8) as zipObj:
|
||||
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
|
||||
# Add the index
|
||||
zipObj.write(os.path.join(datastore_o.datastore_path, "url-watches.json"), arcname="url-watches.json")
|
||||
|
||||
# Add the flask app secret
|
||||
zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt")
|
||||
|
||||
# Add any data in the watch data directory.
|
||||
for uuid, w in datastore.data['watching'].items():
|
||||
for f in Path(w.watch_data_dir).glob('*'):
|
||||
zipObj.write(f,
|
||||
# Use the full path to access the file, but make the file 'relative' in the Zip.
|
||||
arcname=os.path.join(f.parts[-2], f.parts[-1]),
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8)
|
||||
|
||||
# Create a list file with just the URLs, so it's easier to port somewhere else in the future
|
||||
list_file = "url-list.txt"
|
||||
with open(os.path.join(datastore_o.datastore_path, list_file), "w") as f:
|
||||
for uuid in datastore.data["watching"]:
|
||||
url = datastore.data["watching"][uuid]["url"]
|
||||
f.write("{}\r\n".format(url))
|
||||
list_with_tags_file = "url-list-with-tags.txt"
|
||||
with open(
|
||||
os.path.join(datastore_o.datastore_path, list_with_tags_file), "w"
|
||||
) as f:
|
||||
for uuid in datastore.data["watching"]:
|
||||
url = datastore.data["watching"][uuid].get('url')
|
||||
tag = datastore.data["watching"][uuid].get('tags', {})
|
||||
f.write("{} {}\r\n".format(url, tag))
|
||||
|
||||
# Add it to the Zip
|
||||
zipObj.write(
|
||||
os.path.join(datastore_o.datastore_path, list_file),
|
||||
arcname=list_file,
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8,
|
||||
)
|
||||
zipObj.write(
|
||||
os.path.join(datastore_o.datastore_path, list_with_tags_file),
|
||||
arcname=list_with_tags_file,
|
||||
compress_type=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8,
|
||||
)
|
||||
|
||||
# Send_from_directory needs to be the full absolute path
|
||||
return send_from_directory(os.path.abspath(datastore_o.datastore_path), backupname, as_attachment=True)
|
||||
|
||||
@app.route("/static/<string:group>/<string:filename>", methods=['GET'])
|
||||
def static_content(group, filename):
|
||||
from flask import make_response
|
||||
@@ -1687,6 +1616,9 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
import changedetectionio.blueprint.check_proxies as check_proxies
|
||||
app.register_blueprint(check_proxies.construct_blueprint(datastore=datastore), url_prefix='/check_proxy')
|
||||
|
||||
import changedetectionio.blueprint.backups as backups
|
||||
app.register_blueprint(backups.construct_blueprint(datastore), url_prefix='/backups')
|
||||
|
||||
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
@@ -1811,12 +1743,14 @@ def ticker_thread_check_time_launch_checks():
|
||||
except RuntimeError as e:
|
||||
# RuntimeError: dictionary changed size during iteration
|
||||
time.sleep(0.1)
|
||||
watch_uuid_list = []
|
||||
else:
|
||||
break
|
||||
|
||||
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
|
||||
while update_q.qsize() >= 2000:
|
||||
time.sleep(1)
|
||||
logger.warning(f"Recheck watches queue size limit reached ({MAX_QUEUE_SIZE}), skipping adding more items")
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
recheck_time_system_seconds = int(datastore.threshold_seconds)
|
||||
|
||||
@@ -54,29 +54,64 @@ def include_filters(include_filters, html_content, append_pretty_line_formatting
|
||||
def subtractive_css_selector(css_selector, html_content):
|
||||
from bs4 import BeautifulSoup
|
||||
soup = BeautifulSoup(html_content, "html.parser")
|
||||
for item in soup.select(css_selector):
|
||||
|
||||
# So that the elements dont shift their index, build a list of elements here which will be pointers to their place in the DOM
|
||||
elements_to_remove = soup.select(css_selector)
|
||||
|
||||
# Then, remove them in a separate loop
|
||||
for item in elements_to_remove:
|
||||
item.decompose()
|
||||
|
||||
return str(soup)
|
||||
|
||||
def subtractive_xpath_selector(xpath_selector, html_content):
|
||||
def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
|
||||
# Parse the HTML content using lxml
|
||||
html_tree = etree.HTML(html_content)
|
||||
elements_to_remove = html_tree.xpath(xpath_selector)
|
||||
|
||||
# First, collect all elements to remove
|
||||
elements_to_remove = []
|
||||
|
||||
# Iterate over the list of XPath selectors
|
||||
for selector in selectors:
|
||||
# Collect elements for each selector
|
||||
elements_to_remove.extend(html_tree.xpath(selector))
|
||||
|
||||
# Then, remove them in a separate loop
|
||||
for element in elements_to_remove:
|
||||
element.getparent().remove(element)
|
||||
if element.getparent() is not None: # Ensure the element has a parent before removing
|
||||
element.getparent().remove(element)
|
||||
|
||||
# Convert the modified HTML tree back to a string
|
||||
modified_html = etree.tostring(html_tree, method="html").decode("utf-8")
|
||||
return modified_html
|
||||
|
||||
|
||||
def element_removal(selectors: List[str], html_content):
|
||||
"""Removes elements that match a list of CSS or xPath selectors."""
|
||||
"""Removes elements that match a list of CSS or XPath selectors."""
|
||||
modified_html = html_content
|
||||
css_selectors = []
|
||||
xpath_selectors = []
|
||||
|
||||
for selector in selectors:
|
||||
if selector.startswith(('xpath:', 'xpath1:', '//')):
|
||||
# Handle XPath selectors separately
|
||||
xpath_selector = selector.removeprefix('xpath:').removeprefix('xpath1:')
|
||||
modified_html = subtractive_xpath_selector(xpath_selector, modified_html)
|
||||
xpath_selectors.append(xpath_selector)
|
||||
else:
|
||||
modified_html = subtractive_css_selector(selector, modified_html)
|
||||
# Collect CSS selectors as one "hit", see comment in subtractive_css_selector
|
||||
css_selectors.append(selector.strip().strip(","))
|
||||
|
||||
if xpath_selectors:
|
||||
modified_html = subtractive_xpath_selector(xpath_selectors, modified_html)
|
||||
|
||||
if css_selectors:
|
||||
# Remove duplicates, then combine all CSS selectors into one string, separated by commas
|
||||
# This stops the elements index shifting
|
||||
unique_selectors = list(set(css_selectors)) # Ensure uniqueness
|
||||
combined_css_selector = " , ".join(unique_selectors)
|
||||
modified_html = subtractive_css_selector(combined_css_selector, modified_html)
|
||||
|
||||
|
||||
return modified_html
|
||||
|
||||
def elementpath_tostring(obj):
|
||||
|
||||
@@ -89,6 +89,10 @@ class model(watch_base):
|
||||
|
||||
if ready_url.startswith('source:'):
|
||||
ready_url=ready_url.replace('source:', '')
|
||||
|
||||
# Also double check it after any Jinja2 formatting just incase
|
||||
if not is_safe_url(ready_url):
|
||||
return 'DISABLED'
|
||||
return ready_url
|
||||
|
||||
def clear_watch(self):
|
||||
|
||||
@@ -31,15 +31,15 @@ class difference_detection_processor():
|
||||
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
|
||||
# Protect against file:// access
|
||||
if re.search(r'^file://', self.watch.get('url', '').strip(), re.IGNORECASE):
|
||||
url = self.watch.link
|
||||
|
||||
# Protect against file://, file:/ access, check the real "link" without any meta "source:" etc prepended.
|
||||
if re.search(r'^file:/', url.strip(), re.IGNORECASE):
|
||||
if not strtobool(os.getenv('ALLOW_FILE_URI', 'false')):
|
||||
raise Exception(
|
||||
"file:// type access is denied for security reasons."
|
||||
)
|
||||
|
||||
url = self.watch.link
|
||||
|
||||
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
|
||||
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ def _deduplicate_prices(data):
|
||||
|
||||
if isinstance(datum.value, list):
|
||||
# Process each item in the list
|
||||
normalized_value = set([float(re.sub(r'[^\d.]', '', str(item))) for item in datum.value])
|
||||
normalized_value = set([float(re.sub(r'[^\d.]', '', str(item))) for item in datum.value if str(item).strip()])
|
||||
unique_data.update(normalized_value)
|
||||
else:
|
||||
# Process single value
|
||||
|
||||
@@ -70,7 +70,7 @@
|
||||
<a href="{{ url_for('import_page')}}" class="pure-menu-link">IMPORT</a>
|
||||
</li>
|
||||
<li class="pure-menu-item">
|
||||
<a href="{{ url_for('get_backup')}}" class="pure-menu-link">BACKUP</a>
|
||||
<a href="{{ url_for('backups.index')}}" class="pure-menu-link">BACKUPS</a>
|
||||
</li>
|
||||
{% else %}
|
||||
<li class="pure-menu-item">
|
||||
|
||||
@@ -26,8 +26,24 @@ def test_backup(client, live_server, measure_memory_usage):
|
||||
assert b"1 Imported" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Launch the thread in the background to create the backup
|
||||
res = client.get(
|
||||
url_for("get_backup"),
|
||||
url_for("backups.request_backup"),
|
||||
follow_redirects=True
|
||||
)
|
||||
time.sleep(2)
|
||||
|
||||
res = client.get(
|
||||
url_for("backups.index"),
|
||||
follow_redirects=True
|
||||
)
|
||||
# Can see the download link to the backup
|
||||
assert b'<a href="/backups/download/changedetection-backup-20' in res.data
|
||||
assert b'Remove backups' in res.data
|
||||
|
||||
# Get the latest one
|
||||
res = client.get(
|
||||
url_for("backups.download_backup", filename="latest"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
@@ -44,3 +60,11 @@ def test_backup(client, live_server, measure_memory_usage):
|
||||
|
||||
# Should be two txt files in the archive (history and the snapshot)
|
||||
assert len(newlist) == 2
|
||||
|
||||
# Get the latest one
|
||||
res = client.get(
|
||||
url_for("backups.remove_backups"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'No backups found.' in res.data
|
||||
@@ -11,6 +11,35 @@ from .util import live_server_setup, wait_for_all_checks
|
||||
def test_setup(live_server):
|
||||
live_server_setup(live_server)
|
||||
|
||||
def set_response_with_multiple_index():
|
||||
data= """<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
|
||||
<!-- NOTE!! CHROME WILL ADD TBODY HERE IF ITS NOT THERE!! -->
|
||||
<table style="width:100%">
|
||||
<tr>
|
||||
<th>Person 1</th>
|
||||
<th>Person 2</th>
|
||||
<th>Person 3</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Emil</td>
|
||||
<td>Tobias</td>
|
||||
<td>Linus</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>16</td>
|
||||
<td>14</td>
|
||||
<td>10</td>
|
||||
</tr>
|
||||
</table>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write(data)
|
||||
|
||||
|
||||
def set_original_response():
|
||||
test_return_data = """<html>
|
||||
@@ -177,3 +206,61 @@ def test_element_removal_full(client, live_server, measure_memory_usage):
|
||||
# There should not be an unviewed change, as changes should be removed
|
||||
res = client.get(url_for("index"))
|
||||
assert b"unviewed" not in res.data
|
||||
|
||||
# Re #2752
|
||||
def test_element_removal_nth_offset_no_shift(client, live_server, measure_memory_usage):
|
||||
#live_server_setup(live_server)
|
||||
|
||||
set_response_with_multiple_index()
|
||||
subtractive_selectors_data = ["""
|
||||
body > table > tr:nth-child(1) > th:nth-child(2)
|
||||
body > table > tr:nth-child(2) > td:nth-child(2)
|
||||
body > table > tr:nth-child(3) > td:nth-child(2)
|
||||
body > table > tr:nth-child(1) > th:nth-child(3)
|
||||
body > table > tr:nth-child(2) > td:nth-child(3)
|
||||
body > table > tr:nth-child(3) > td:nth-child(3)""",
|
||||
"""//body/table/tr[1]/th[2]
|
||||
//body/table/tr[2]/td[2]
|
||||
//body/table/tr[3]/td[2]
|
||||
//body/table/tr[1]/th[3]
|
||||
//body/table/tr[2]/td[3]
|
||||
//body/table/tr[3]/td[3]"""]
|
||||
|
||||
for selector_list in subtractive_selectors_data:
|
||||
|
||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||
assert b'Deleted' in res.data
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for("test_endpoint", _external=True)
|
||||
res = client.post(
|
||||
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
|
||||
)
|
||||
assert b"1 Imported" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.post(
|
||||
url_for("edit_page", uuid="first"),
|
||||
data={
|
||||
"subtractive_selectors": selector_list,
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
"fetch_backend": "html_requests",
|
||||
},
|
||||
follow_redirects=True,
|
||||
)
|
||||
assert b"Updated watch." in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
url_for("preview_page", uuid="first"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Tobias" not in res.data
|
||||
assert b"Linus" not in res.data
|
||||
assert b"Person 2" not in res.data
|
||||
assert b"Person 3" not in res.data
|
||||
# First column should exist
|
||||
assert b"Emil" in res.data
|
||||
|
||||
|
||||
@@ -61,10 +61,10 @@ def test_bad_access(client, live_server, measure_memory_usage):
|
||||
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
|
||||
|
||||
|
||||
def test_file_access(client, live_server, measure_memory_usage):
|
||||
def test_file_slashslash_access(client, live_server, measure_memory_usage):
|
||||
#live_server_setup(live_server)
|
||||
|
||||
test_file_path = "/tmp/test-file.txt"
|
||||
test_file_path = os.path.abspath(__file__)
|
||||
|
||||
# file:// is permitted by default, but it will be caught by ALLOW_FILE_URI
|
||||
client.post(
|
||||
@@ -82,8 +82,30 @@ def test_file_access(client, live_server, measure_memory_usage):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Should see something (this file added by run_basic_tests.sh)
|
||||
assert b"Hello world" in res.data
|
||||
assert b"test_file_slashslash_access" in res.data
|
||||
else:
|
||||
# Default should be here
|
||||
assert b'file:// type access is denied for security reasons.' in res.data
|
||||
|
||||
def test_file_slash_access(client, live_server, measure_memory_usage):
|
||||
#live_server_setup(live_server)
|
||||
|
||||
test_file_path = os.path.abspath(__file__)
|
||||
|
||||
# file:// is permitted by default, but it will be caught by ALLOW_FILE_URI
|
||||
client.post(
|
||||
url_for("form_quick_watch_add"),
|
||||
data={"url": f"file:/{test_file_path}", "tags": ''},
|
||||
follow_redirects=True
|
||||
)
|
||||
wait_for_all_checks(client)
|
||||
res = client.get(url_for("index"))
|
||||
|
||||
# If it is enabled at test time
|
||||
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')):
|
||||
# So it should permit it, but it should fall back to the 'requests' library giving an error
|
||||
# (but means it gets passed to playwright etc)
|
||||
assert b"URLs with hostname components are not permitted" in res.data
|
||||
else:
|
||||
# Default should be here
|
||||
assert b'file:// type access is denied for security reasons.' in res.data
|
||||
|
||||
@@ -74,7 +74,7 @@ services:
|
||||
# If WEBDRIVER or PLAYWRIGHT are enabled, changedetection container depends on that
|
||||
# and must wait before starting (substitute "browser-chrome" with "playwright-chrome" if last one is used)
|
||||
# depends_on:
|
||||
# playwright-chrome:
|
||||
# sockpuppetbrowser:
|
||||
# condition: service_started
|
||||
|
||||
|
||||
|
||||
@@ -59,7 +59,9 @@ elementpath==4.1.5
|
||||
|
||||
selenium~=4.14.0
|
||||
|
||||
werkzeug~=3.0
|
||||
# https://github.com/pallets/werkzeug/issues/2985
|
||||
# Maybe related to pytest?
|
||||
werkzeug==3.0.6
|
||||
|
||||
# Templating, so far just in the URLs but in the future can be for the notifications also
|
||||
jinja2~=3.1
|
||||
|
||||
Reference in New Issue
Block a user