From f9387522ee2792d7688d97f3beeab43f36bcd6a8 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Mon, 29 May 2023 16:11:43 +0200 Subject: [PATCH] Fetching - Be sure that content-type detection works when the headers are a mixed case (#1604) --- changedetectionio/content_fetcher.py | 7 +++++ .../processors/text_json_diff.py | 8 +++--- .../tests/test_jsonpath_jq_selector.py | 28 ++++++++++++++++++- changedetectionio/tests/util.py | 14 ++++++++-- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/changedetectionio/content_fetcher.py b/changedetectionio/content_fetcher.py index 3960f025..d4d593a1 100644 --- a/changedetectionio/content_fetcher.py +++ b/changedetectionio/content_fetcher.py @@ -147,6 +147,13 @@ class Fetcher(): def is_ready(self): return True + def get_all_headers(self): + """ + Get all headers but ensure all keys are lowercase + :return: + """ + return {k.lower(): v for k, v in self.headers.items()} + def iterate_browser_steps(self): from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface from playwright._impl._api_types import TimeoutError diff --git a/changedetectionio/processors/text_json_diff.py b/changedetectionio/processors/text_json_diff.py index f767703b..e0fc0cd2 100644 --- a/changedetectionio/processors/text_json_diff.py +++ b/changedetectionio/processors/text_json_diff.py @@ -139,7 +139,7 @@ class perform_site_check(difference_detection_processor): self.xpath_data = fetcher.xpath_data # Track the content type - update_obj['content_type'] = fetcher.headers.get('Content-Type', '') + update_obj['content_type'] = fetcher.get_all_headers().get('content-type', '').lower() # Watches added automatically in the queue manager will skip if its the same checksum as the previous run # Saves a lot of CPU @@ -159,7 +159,7 @@ class perform_site_check(difference_detection_processor): # https://stackoverflow.com/questions/41817578/basic-method-chaining ? # return content().textfilter().jsonextract().checksumcompare() ? - is_json = 'application/json' in fetcher.headers.get('Content-Type', '') + is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower() is_html = not is_json # source: support, basically treat it as plaintext @@ -167,7 +167,7 @@ class perform_site_check(difference_detection_processor): is_html = False is_json = False - if watch.is_pdf or 'application/pdf' in fetcher.headers.get('Content-Type', '').lower(): + if watch.is_pdf or 'application/pdf' in fetcher.get_all_headers().get('content-type', '').lower(): from shutil import which tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml") if not which(tool): @@ -235,7 +235,7 @@ class perform_site_check(difference_detection_processor): html_content = fetcher.content # If not JSON, and if it's not text/plain.. - if 'text/plain' in fetcher.headers.get('Content-Type', '').lower(): + if 'text/plain' in fetcher.get_all_headers().get('content-type', '').lower(): # Don't run get_text or xpath/css filters on plaintext stripped_text_from_html = html_content else: diff --git a/changedetectionio/tests/test_jsonpath_jq_selector.py b/changedetectionio/tests/test_jsonpath_jq_selector.py index 2a8fd52f..300bbf76 100644 --- a/changedetectionio/tests/test_jsonpath_jq_selector.py +++ b/changedetectionio/tests/test_jsonpath_jq_selector.py @@ -3,7 +3,7 @@ import time from flask import url_for, escape -from . util import live_server_setup +from . util import live_server_setup, wait_for_all_checks import pytest jq_support = True @@ -436,6 +436,32 @@ def test_ignore_json_order(client, live_server): res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) assert b'Deleted' in res.data +def test_correct_header_detect(client, live_server): + + # Like in https://github.com/dgtlmoon/changedetection.io/pull/1593 + # Specify extra html that JSON is sometimes wrapped in - when using Browserless/Puppeteer etc + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write('{"hello" : 123, "world": 123}') + + # Add our URL to the import page + # Check weird casing is cleaned up and detected also + test_url = url_for('test_endpoint', content_type="aPPlication/JSon", uppercase_headers=True, _external=True) + res = client.post( + url_for("import_page"), + data={"urls": test_url}, + follow_redirects=True + ) + assert b"1 Imported" in res.data + wait_for_all_checks(client) + + + res = client.get(url_for("index")) + # This will be fixed in #1593 + assert b'No parsable JSON found in this document' in res.data + + res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) + assert b'Deleted' in res.data + def test_check_jsonpath_ext_filter(client, live_server): check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server) diff --git a/changedetectionio/tests/util.py b/changedetectionio/tests/util.py index e83fdd25..65a3e513 100644 --- a/changedetectionio/tests/util.py +++ b/changedetectionio/tests/util.py @@ -119,16 +119,26 @@ def live_server_setup(live_server): status_code = request.args.get('status_code') content = request.args.get('content') or None + # Used to just try to break the header detection + uppercase_headers = request.args.get('uppercase_headers') + try: if content is not None: resp = make_response(content, status_code) - resp.headers['Content-Type'] = ctype if ctype else 'text/html' + if uppercase_headers: + ctype=ctype.upper() + resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html' + else: + resp.headers['Content-Type'] = ctype if ctype else 'text/html' return resp # Tried using a global var here but didn't seem to work, so reading from a file instead. with open("test-datastore/endpoint-content.txt", "r") as f: resp = make_response(f.read(), status_code) - resp.headers['Content-Type'] = ctype if ctype else 'text/html' + if uppercase_headers: + resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html' + else: + resp.headers['Content-Type'] = ctype if ctype else 'text/html' return resp except FileNotFoundError: return make_response('', status_code)