Compare commits

...

17 Commits

Author SHA1 Message Date
dgtlmoon
4c44958ef0 Adding basic TZ info output 2024-11-19 14:37:05 +01:00
dgtlmoon
4e6c783c45 Update COMMERCIAL_LICENCE.md 2024-11-18 16:44:03 +01:00
dgtlmoon
0f0f5af7b5 Ability to disable version check (set DISABLE_VERSION_CHECK=true) Re #2773 (#2775) 2024-11-10 15:48:05 +01:00
dgtlmoon
7fcba26bea Minor improvement for queue management 2024-11-10 11:06:47 +01:00
dgtlmoon
4bda1a234f Update bug_report.md 2024-11-10 10:13:22 +01:00
dgtlmoon
d297850539 Security - Fix test 2024-11-07 20:10:02 +01:00
dgtlmoon
751239250f Security check - improve test 2024-11-07 19:41:48 +01:00
dgtlmoon
6aceeb01ab 0.47.06 2024-11-07 18:47:18 +01:00
dgtlmoon
49bc982c69 CVE-2024-51998 - file:/ path traversal access should not be allowed to access a file without ALLOW_FILE_URI set 2024-11-07 18:45:19 +01:00
Arthur Nogueira Neves
e0abf0b505 Update docker-compose.yml (#2767) 2024-11-06 18:41:55 +01:00
dgtlmoon
f08a1185aa Price tracker - fix for sites that supply an empty additional price (#2758) 2024-11-01 10:56:27 +01:00
dgtlmoon
ad5d7efbbf Testing - Pinning werkzeug (#2757) 2024-11-01 10:23:34 +01:00
dgtlmoon
7029d10f8b 0.47.05 2024-10-31 22:51:03 +01:00
dgtlmoon
26d3a23e05 CVE-2024-51483 - Fix for limiting access to file:// via source:file:///tmp/file.txt when using webdriver/playwright 2024-10-31 22:49:31 +01:00
dgtlmoon
942625e1fb Backups - Hide incomplete/running backups from being downloaded 2024-10-31 10:58:41 +01:00
dgtlmoon
33c83230a6 Backups - Backups now operate in the background, provide a nice UI to access/download previous backups (#2755) 2024-10-31 10:34:59 +01:00
dgtlmoon
87510becb5 Filters - Process all CSS and XPath 'subtract' selectors in a single pass to prevent index shifting and reference loss during DOM manipulation. (#2754) 2024-10-30 12:00:53 +01:00
18 changed files with 450 additions and 101 deletions

View File

@@ -27,6 +27,10 @@ A clear and concise description of what the bug is.
**Version**
*Exact version* in the top right area: 0....
**How did you install?**
Docker, Pip, from source directly etc
**To Reproduce**
Steps to reproduce the behavior:

View File

@@ -4,7 +4,7 @@ In any commercial activity involving 'Hosting' (as defined herein), whether in p
# Commercial License Agreement
This Commercial License Agreement ("Agreement") is entered into by and between Mr Morresi (the original creator of this software) here-in ("Licensor") and (your company or personal name) _____________ ("Licensee"). This Agreement sets forth the terms and conditions under which Licensor provides its software ("Software") and services to Licensee for the purpose of reselling the software either in part or full, as part of any commercial activity where the activity involves a third party.
This Commercial License Agreement ("Agreement") is entered into by and between Web Technologies s.r.o. here-in ("Licensor") and (your company or personal name) _____________ ("Licensee"). This Agreement sets forth the terms and conditions under which Licensor provides its software ("Software") and services to Licensee for the purpose of reselling the software either in part or full, as part of any commercial activity where the activity involves a third party.
### Definition of Hosting

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.47.04'
__version__ = '0.47.06'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -0,0 +1,164 @@
import datetime
import glob
import threading
from flask import Blueprint, render_template, send_from_directory, flash, url_for, redirect, abort
import os
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.flask_app import login_optionally_required
from loguru import logger
BACKUP_FILENAME_FORMAT = "changedetection-backup-{}.zip"
def create_backup(datastore_path, watches: dict):
logger.debug("Creating backup...")
import zipfile
from pathlib import Path
# create a ZipFile object
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
backupname = BACKUP_FILENAME_FORMAT.format(timestamp)
backup_filepath = os.path.join(datastore_path, backupname)
with zipfile.ZipFile(backup_filepath.replace('.zip', '.tmp'), "w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=8) as zipObj:
# Add the index
zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json")
# Add the flask app secret
zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt")
# Add any data in the watch data directory.
for uuid, w in watches.items():
for f in Path(w.watch_data_dir).glob('*'):
zipObj.write(f,
# Use the full path to access the file, but make the file 'relative' in the Zip.
arcname=os.path.join(f.parts[-2], f.parts[-1]),
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8)
# Create a list file with just the URLs, so it's easier to port somewhere else in the future
list_file = "url-list.txt"
with open(os.path.join(datastore_path, list_file), "w") as f:
for uuid in watches:
url = watches[uuid]["url"]
f.write("{}\r\n".format(url))
list_with_tags_file = "url-list-with-tags.txt"
with open(
os.path.join(datastore_path, list_with_tags_file), "w"
) as f:
for uuid in watches:
url = watches[uuid].get('url')
tag = watches[uuid].get('tags', {})
f.write("{} {}\r\n".format(url, tag))
# Add it to the Zip
zipObj.write(
os.path.join(datastore_path, list_file),
arcname=list_file,
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8,
)
zipObj.write(
os.path.join(datastore_path, list_with_tags_file),
arcname=list_with_tags_file,
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8,
)
# Now it's done, rename it so it shows up finally and its completed being written.
os.rename(backup_filepath.replace('.zip', '.tmp'), backup_filepath.replace('.tmp', '.zip'))
def construct_blueprint(datastore: ChangeDetectionStore):
backups_blueprint = Blueprint('backups', __name__, template_folder="templates")
backup_threads = []
@login_optionally_required
@backups_blueprint.route("/request-backup", methods=['GET'])
def request_backup():
if any(thread.is_alive() for thread in backup_threads):
flash("A backup is already running, check back in a few minutes", "error")
return redirect(url_for('backups.index'))
if len(find_backups()) > int(os.getenv("MAX_NUMBER_BACKUPS", 100)):
flash("Maximum number of backups reached, please remove some", "error")
return redirect(url_for('backups.index'))
# Be sure we're written fresh
datastore.sync_to_json()
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
zip_thread.start()
backup_threads.append(zip_thread)
flash("Backup building in background, check back in a few minutes.")
return redirect(url_for('backups.index'))
def find_backups():
backup_filepath = os.path.join(datastore.datastore_path, BACKUP_FILENAME_FORMAT.format("*"))
backups = glob.glob(backup_filepath)
backup_info = []
for backup in backups:
size = os.path.getsize(backup) / (1024 * 1024)
creation_time = os.path.getctime(backup)
backup_info.append({
'filename': os.path.basename(backup),
'filesize': f"{size:.2f}",
'creation_time': creation_time
})
backup_info.sort(key=lambda x: x['creation_time'], reverse=True)
return backup_info
@login_optionally_required
@backups_blueprint.route("/download/<string:filename>", methods=['GET'])
def download_backup(filename):
import re
filename = filename.strip()
backup_filename_regex = BACKUP_FILENAME_FORMAT.format("\d+")
full_path = os.path.join(os.path.abspath(datastore.datastore_path), filename)
if not full_path.startswith(os.path.abspath(datastore.datastore_path)):
abort(404)
if filename == 'latest':
backups = find_backups()
filename = backups[0]['filename']
if not re.match(r"^" + backup_filename_regex + "$", filename):
abort(400) # Bad Request if the filename doesn't match the pattern
logger.debug(f"Backup download request for '{full_path}'")
return send_from_directory(os.path.abspath(datastore.datastore_path), filename, as_attachment=True)
@login_optionally_required
@backups_blueprint.route("/", methods=['GET'])
def index():
backups = find_backups()
output = render_template("overview.html",
available_backups=backups,
backup_running=any(thread.is_alive() for thread in backup_threads)
)
return output
@login_optionally_required
@backups_blueprint.route("/remove-backups", methods=['GET'])
def remove_backups():
backup_filepath = os.path.join(datastore.datastore_path, BACKUP_FILENAME_FORMAT.format("*"))
backups = glob.glob(backup_filepath)
for backup in backups:
os.unlink(backup)
flash("Backups were deleted.")
return redirect(url_for('backups.index'))
return backups_blueprint

View File

@@ -0,0 +1,36 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_simple_field, render_field %}
<div class="edit-form">
<div class="box-wrap inner">
<h4>Backups</h4>
{% if backup_running %}
<p>
<strong>A backup is running!</strong>
</p>
{% endif %}
<p>
Here you can download and request a new backup, when a backup is completed you will see it listed below.
</p>
<br>
{% if available_backups %}
<ul>
{% for backup in available_backups %}
<li><a href="{{ url_for('backups.download_backup', filename=backup["filename"]) }}">{{ backup["filename"] }}</a> {{ backup["filesize"] }} Mb</li>
{% endfor %}
</ul>
{% else %}
<p>
<strong>No backups found.</strong>
</p>
{% endif %}
<a class="pure-button pure-button-primary" href="{{ url_for('backups.request_backup') }}">Create backup</a>
{% if available_backups %}
<a class="pure-button button-small button-error " href="{{ url_for('backups.remove_backups') }}">Remove backups</a>
{% endif %}
</div>
</div>
{% endblock %}

View File

@@ -53,6 +53,7 @@ extra_stylesheets = []
update_q = queue.PriorityQueue()
notification_q = queue.Queue()
MAX_QUEUE_SIZE = 2000
app = Flask(__name__,
static_url_path="",
@@ -83,7 +84,7 @@ csrf = CSRFProtect()
csrf.init_app(app)
notification_debug_log=[]
# get locale ready
# Locale for correct presentation of prices etc
default_locale = locale.getdefaultlocale()
logger.info(f"System locale default is {default_locale}")
try:
@@ -158,6 +159,21 @@ def _jinja2_filter_pagination_slice(arr, skip):
return arr
def app_get_system_time():
from zoneinfo import ZoneInfo # Built-in timezone support in Python 3.9+
system_timezone = datastore.data['settings']['application'].get('timezone')
if not system_timezone:
system_timezone = os.environ.get("TZ")
try:
system_zone = ZoneInfo(system_timezone)
except Exception as e:
logger.warning(f'Warning, unable to use timezone "{system_timezone}" defaulting to UTC- {str(e)}')
system_zone = ZoneInfo("UTC") # Fallback to UTC if the timezone is invalid
return system_zone
@app.template_filter('format_seconds_ago')
def _jinja2_filter_seconds_precise(timestamp):
if timestamp == False:
@@ -242,6 +258,9 @@ def changedetection_app(config=None, datastore_o=None):
# (instead of the global var)
app.config['DATASTORE'] = datastore_o
# Just to check (it will output some debug if not)
app_get_system_time()
login_manager = flask_login.LoginManager(app)
login_manager.login_view = 'login'
app.secret_key = init_app_secret(config['datastore_path'])
@@ -881,6 +900,7 @@ def changedetection_app(config=None, datastore_o=None):
@login_optionally_required
def settings_page():
from changedetectionio import forms
from datetime import datetime
default = deepcopy(datastore.data['settings'])
if datastore.proxy_list is not None:
@@ -948,6 +968,13 @@ def changedetection_app(config=None, datastore_o=None):
else:
flash("An error occurred, please see below.", "error")
system_timezone = app_get_system_time()
system_time = datetime.now(system_timezone)
# Fallback for locale formatting
formatted_system_time = system_time.strftime("%Y-%m-%d %H:%M:%S %Z%z") # Locale-aware time
output = render_template("settings.html",
api_key=datastore.data['settings']['application'].get('api_access_token'),
emailprefix=os.getenv('NOTIFICATION_MAIL_BUTTON_PREFIX', False),
@@ -955,7 +982,9 @@ def changedetection_app(config=None, datastore_o=None):
form=form,
hide_remove_pass=os.getenv("SALTED_PASS", False),
min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)),
settings_application=datastore.data['settings']['application']
settings_application=datastore.data['settings']['application'],
system_time=formatted_system_time,
timezone_name=system_timezone
)
return output
@@ -1236,78 +1265,6 @@ def changedetection_app(config=None, datastore_o=None):
return output
# We're good but backups are even better!
@app.route("/backup", methods=['GET'])
@login_optionally_required
def get_backup():
import zipfile
from pathlib import Path
# Remove any existing backup file, for now we just keep one file
for previous_backup_filename in Path(datastore_o.datastore_path).rglob('changedetection-backup-*.zip'):
os.unlink(previous_backup_filename)
# create a ZipFile object
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
backupname = "changedetection-backup-{}.zip".format(timestamp)
backup_filepath = os.path.join(datastore_o.datastore_path, backupname)
with zipfile.ZipFile(backup_filepath, "w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=8) as zipObj:
# Be sure we're written fresh
datastore.sync_to_json()
# Add the index
zipObj.write(os.path.join(datastore_o.datastore_path, "url-watches.json"), arcname="url-watches.json")
# Add the flask app secret
zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt")
# Add any data in the watch data directory.
for uuid, w in datastore.data['watching'].items():
for f in Path(w.watch_data_dir).glob('*'):
zipObj.write(f,
# Use the full path to access the file, but make the file 'relative' in the Zip.
arcname=os.path.join(f.parts[-2], f.parts[-1]),
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8)
# Create a list file with just the URLs, so it's easier to port somewhere else in the future
list_file = "url-list.txt"
with open(os.path.join(datastore_o.datastore_path, list_file), "w") as f:
for uuid in datastore.data["watching"]:
url = datastore.data["watching"][uuid]["url"]
f.write("{}\r\n".format(url))
list_with_tags_file = "url-list-with-tags.txt"
with open(
os.path.join(datastore_o.datastore_path, list_with_tags_file), "w"
) as f:
for uuid in datastore.data["watching"]:
url = datastore.data["watching"][uuid].get('url')
tag = datastore.data["watching"][uuid].get('tags', {})
f.write("{} {}\r\n".format(url, tag))
# Add it to the Zip
zipObj.write(
os.path.join(datastore_o.datastore_path, list_file),
arcname=list_file,
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8,
)
zipObj.write(
os.path.join(datastore_o.datastore_path, list_with_tags_file),
arcname=list_with_tags_file,
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8,
)
# Send_from_directory needs to be the full absolute path
return send_from_directory(os.path.abspath(datastore_o.datastore_path), backupname, as_attachment=True)
@app.route("/static/<string:group>/<string:filename>", methods=['GET'])
def static_content(group, filename):
from flask import make_response
@@ -1687,13 +1644,16 @@ def changedetection_app(config=None, datastore_o=None):
import changedetectionio.blueprint.check_proxies as check_proxies
app.register_blueprint(check_proxies.construct_blueprint(datastore=datastore), url_prefix='/check_proxy')
import changedetectionio.blueprint.backups as backups
app.register_blueprint(backups.construct_blueprint(datastore), url_prefix='/backups')
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')):
threading.Thread(target=check_for_new_version).start()
return app
@@ -1777,7 +1737,6 @@ def notification_runner():
def ticker_thread_check_time_launch_checks():
import random
from changedetectionio import update_worker
proxy_last_called_time = {}
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
@@ -1811,12 +1770,14 @@ def ticker_thread_check_time_launch_checks():
except RuntimeError as e:
# RuntimeError: dictionary changed size during iteration
time.sleep(0.1)
watch_uuid_list = []
else:
break
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
while update_q.qsize() >= 2000:
time.sleep(1)
logger.warning(f"Recheck watches queue size limit reached ({MAX_QUEUE_SIZE}), skipping adding more items")
time.sleep(3)
recheck_time_system_seconds = int(datastore.threshold_seconds)

View File

@@ -54,29 +54,64 @@ def include_filters(include_filters, html_content, append_pretty_line_formatting
def subtractive_css_selector(css_selector, html_content):
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
for item in soup.select(css_selector):
# So that the elements dont shift their index, build a list of elements here which will be pointers to their place in the DOM
elements_to_remove = soup.select(css_selector)
# Then, remove them in a separate loop
for item in elements_to_remove:
item.decompose()
return str(soup)
def subtractive_xpath_selector(xpath_selector, html_content):
def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
# Parse the HTML content using lxml
html_tree = etree.HTML(html_content)
elements_to_remove = html_tree.xpath(xpath_selector)
# First, collect all elements to remove
elements_to_remove = []
# Iterate over the list of XPath selectors
for selector in selectors:
# Collect elements for each selector
elements_to_remove.extend(html_tree.xpath(selector))
# Then, remove them in a separate loop
for element in elements_to_remove:
element.getparent().remove(element)
if element.getparent() is not None: # Ensure the element has a parent before removing
element.getparent().remove(element)
# Convert the modified HTML tree back to a string
modified_html = etree.tostring(html_tree, method="html").decode("utf-8")
return modified_html
def element_removal(selectors: List[str], html_content):
"""Removes elements that match a list of CSS or xPath selectors."""
"""Removes elements that match a list of CSS or XPath selectors."""
modified_html = html_content
css_selectors = []
xpath_selectors = []
for selector in selectors:
if selector.startswith(('xpath:', 'xpath1:', '//')):
# Handle XPath selectors separately
xpath_selector = selector.removeprefix('xpath:').removeprefix('xpath1:')
modified_html = subtractive_xpath_selector(xpath_selector, modified_html)
xpath_selectors.append(xpath_selector)
else:
modified_html = subtractive_css_selector(selector, modified_html)
# Collect CSS selectors as one "hit", see comment in subtractive_css_selector
css_selectors.append(selector.strip().strip(","))
if xpath_selectors:
modified_html = subtractive_xpath_selector(xpath_selectors, modified_html)
if css_selectors:
# Remove duplicates, then combine all CSS selectors into one string, separated by commas
# This stops the elements index shifting
unique_selectors = list(set(css_selectors)) # Ensure uniqueness
combined_css_selector = " , ".join(unique_selectors)
modified_html = subtractive_css_selector(combined_css_selector, modified_html)
return modified_html
def elementpath_tostring(obj):

View File

@@ -52,7 +52,8 @@ class model(dict):
'schema_version' : 0,
'shared_diff_access': False,
'webdriver_delay': None , # Extra delay in seconds before extracting text
'tags': {} #@todo use Tag.model initialisers
'tags': {}, #@todo use Tag.model initialisers
'timezone': None,
}
}
}

View File

@@ -89,6 +89,10 @@ class model(watch_base):
if ready_url.startswith('source:'):
ready_url=ready_url.replace('source:', '')
# Also double check it after any Jinja2 formatting just incase
if not is_safe_url(ready_url):
return 'DISABLED'
return ready_url
def clear_watch(self):

View File

@@ -31,15 +31,15 @@ class difference_detection_processor():
from requests.structures import CaseInsensitiveDict
# Protect against file:// access
if re.search(r'^file://', self.watch.get('url', '').strip(), re.IGNORECASE):
url = self.watch.link
# Protect against file://, file:/ access, check the real "link" without any meta "source:" etc prepended.
if re.search(r'^file:/', url.strip(), re.IGNORECASE):
if not strtobool(os.getenv('ALLOW_FILE_URI', 'false')):
raise Exception(
"file:// type access is denied for security reasons."
)
url = self.watch.link
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')

View File

@@ -40,7 +40,7 @@ def _deduplicate_prices(data):
if isinstance(datum.value, list):
# Process each item in the list
normalized_value = set([float(re.sub(r'[^\d.]', '', str(item))) for item in datum.value])
normalized_value = set([float(re.sub(r'[^\d.]', '', str(item))) for item in datum.value if str(item).strip()])
unique_data.update(normalized_value)
else:
# Process single value

View File

@@ -70,7 +70,7 @@
<a href="{{ url_for('import_page')}}" class="pure-menu-link">IMPORT</a>
</li>
<li class="pure-menu-item">
<a href="{{ url_for('get_backup')}}" class="pure-menu-link">BACKUP</a>
<a href="{{ url_for('backups.index')}}" class="pure-menu-link">BACKUPS</a>
</li>
{% else %}
<li class="pure-menu-item">

View File

@@ -78,6 +78,10 @@
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
<div class="pure-control-group">
<p><strong>Local Time:</strong> {{ system_time }}</p>
<p><strong>Timezone:</strong> {{ timezone_name }}</p>
</div>
{% if form.requests.proxy %}
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}

View File

@@ -26,8 +26,24 @@ def test_backup(client, live_server, measure_memory_usage):
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Launch the thread in the background to create the backup
res = client.get(
url_for("get_backup"),
url_for("backups.request_backup"),
follow_redirects=True
)
time.sleep(2)
res = client.get(
url_for("backups.index"),
follow_redirects=True
)
# Can see the download link to the backup
assert b'<a href="/backups/download/changedetection-backup-20' in res.data
assert b'Remove backups' in res.data
# Get the latest one
res = client.get(
url_for("backups.download_backup", filename="latest"),
follow_redirects=True
)
@@ -44,3 +60,11 @@ def test_backup(client, live_server, measure_memory_usage):
# Should be two txt files in the archive (history and the snapshot)
assert len(newlist) == 2
# Get the latest one
res = client.get(
url_for("backups.remove_backups"),
follow_redirects=True
)
assert b'No backups found.' in res.data

View File

@@ -11,6 +11,35 @@ from .util import live_server_setup, wait_for_all_checks
def test_setup(live_server):
live_server_setup(live_server)
def set_response_with_multiple_index():
data= """<!DOCTYPE html>
<html>
<body>
<!-- NOTE!! CHROME WILL ADD TBODY HERE IF ITS NOT THERE!! -->
<table style="width:100%">
<tr>
<th>Person 1</th>
<th>Person 2</th>
<th>Person 3</th>
</tr>
<tr>
<td>Emil</td>
<td>Tobias</td>
<td>Linus</td>
</tr>
<tr>
<td>16</td>
<td>14</td>
<td>10</td>
</tr>
</table>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(data)
def set_original_response():
test_return_data = """<html>
@@ -177,3 +206,61 @@ def test_element_removal_full(client, live_server, measure_memory_usage):
# There should not be an unviewed change, as changes should be removed
res = client.get(url_for("index"))
assert b"unviewed" not in res.data
# Re #2752
def test_element_removal_nth_offset_no_shift(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
set_response_with_multiple_index()
subtractive_selectors_data = ["""
body > table > tr:nth-child(1) > th:nth-child(2)
body > table > tr:nth-child(2) > td:nth-child(2)
body > table > tr:nth-child(3) > td:nth-child(2)
body > table > tr:nth-child(1) > th:nth-child(3)
body > table > tr:nth-child(2) > td:nth-child(3)
body > table > tr:nth-child(3) > td:nth-child(3)""",
"""//body/table/tr[1]/th[2]
//body/table/tr[2]/td[2]
//body/table/tr[3]/td[2]
//body/table/tr[1]/th[3]
//body/table/tr[2]/td[3]
//body/table/tr[3]/td[3]"""]
for selector_list in subtractive_selectors_data:
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
# Add our URL to the import page
test_url = url_for("test_endpoint", _external=True)
res = client.post(
url_for("import_page"), data={"urls": test_url}, follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
res = client.post(
url_for("edit_page", uuid="first"),
data={
"subtractive_selectors": selector_list,
"url": test_url,
"tags": "",
"fetch_backend": "html_requests",
},
follow_redirects=True,
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b"Tobias" not in res.data
assert b"Linus" not in res.data
assert b"Person 2" not in res.data
assert b"Person 3" not in res.data
# First column should exist
assert b"Emil" in res.data

View File

@@ -61,10 +61,10 @@ def test_bad_access(client, live_server, measure_memory_usage):
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
def test_file_access(client, live_server, measure_memory_usage):
def test_file_slashslash_access(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
test_file_path = "/tmp/test-file.txt"
test_file_path = os.path.abspath(__file__)
# file:// is permitted by default, but it will be caught by ALLOW_FILE_URI
client.post(
@@ -82,8 +82,30 @@ def test_file_access(client, live_server, measure_memory_usage):
follow_redirects=True
)
# Should see something (this file added by run_basic_tests.sh)
assert b"Hello world" in res.data
assert b"test_file_slashslash_access" in res.data
else:
# Default should be here
assert b'file:// type access is denied for security reasons.' in res.data
def test_file_slash_access(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
test_file_path = os.path.abspath(__file__)
# file:// is permitted by default, but it will be caught by ALLOW_FILE_URI
client.post(
url_for("form_quick_watch_add"),
data={"url": f"file:/{test_file_path}", "tags": ''},
follow_redirects=True
)
wait_for_all_checks(client)
res = client.get(url_for("index"))
# If it is enabled at test time
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')):
# So it should permit it, but it should fall back to the 'requests' library giving an error
# (but means it gets passed to playwright etc)
assert b"URLs with hostname components are not permitted" in res.data
else:
# Default should be here
assert b'file:// type access is denied for security reasons.' in res.data

View File

@@ -61,6 +61,12 @@ services:
#
# If you want to watch local files file:///path/to/file.txt (careful! security implications!)
# - ALLOW_FILE_URI=False
#
# For complete privacy if you don't want to use the 'check version' / telemetry service
# - DISABLE_VERSION_CHECK=true
#
# A valid timezone name to run as (for scheduling watch checking) see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
# - TZ=America/Los_Angeles
# Comment out ports: when using behind a reverse proxy , enable networks: etc.
ports:
@@ -74,7 +80,7 @@ services:
# If WEBDRIVER or PLAYWRIGHT are enabled, changedetection container depends on that
# and must wait before starting (substitute "browser-chrome" with "playwright-chrome" if last one is used)
# depends_on:
# playwright-chrome:
# sockpuppetbrowser:
# condition: service_started

View File

@@ -59,7 +59,9 @@ elementpath==4.1.5
selenium~=4.14.0
werkzeug~=3.0
# https://github.com/pallets/werkzeug/issues/2985
# Maybe related to pytest?
werkzeug==3.0.6
# Templating, so far just in the URLs but in the future can be for the notifications also
jinja2~=3.1
@@ -94,4 +96,3 @@ babel
# Needed for > 3.10, https://github.com/microsoft/playwright-python/issues/2096
greenlet >= 3.0.3