mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 10:31:50 +01:00
feat: enhanced folder naming (#599)
This commit is contained in:
@@ -275,6 +275,13 @@ categories:
|
|||||||
Verschenken & Tauschen > Verleihen: 272/274
|
Verschenken & Tauschen > Verleihen: 272/274
|
||||||
Verschenken & Tauschen > Verschenken: 272/192
|
Verschenken & Tauschen > Verschenken: 272/192
|
||||||
|
|
||||||
|
# download configuration
|
||||||
|
download:
|
||||||
|
include_all_matching_shipping_options: false # if true, all shipping options matching the package size will be included
|
||||||
|
excluded_shipping_options: [] # list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']
|
||||||
|
folder_name_max_length: 100 # maximum length for folder names when downloading ads (default: 100)
|
||||||
|
rename_existing_folders: false # if true, rename existing folders without titles to include titles (default: false)
|
||||||
|
|
||||||
# publishing configuration
|
# publishing configuration
|
||||||
publishing:
|
publishing:
|
||||||
delete_old_ads: "AFTER_PUBLISH" # one of: AFTER_PUBLISH, BEFORE_PUBLISH, NEVER
|
delete_old_ads: "AFTER_PUBLISH" # one of: AFTER_PUBLISH, BEFORE_PUBLISH, NEVER
|
||||||
|
|||||||
13
pdm.lock
generated
13
pdm.lock
generated
@@ -5,7 +5,7 @@
|
|||||||
groups = ["default", "dev"]
|
groups = ["default", "dev"]
|
||||||
strategy = ["inherit_metadata"]
|
strategy = ["inherit_metadata"]
|
||||||
lock_version = "4.5.0"
|
lock_version = "4.5.0"
|
||||||
content_hash = "sha256:25eeef987c3fa08a52036fd696587f2fb89c6474225d7c9108e5d0281aa54d26"
|
content_hash = "sha256:3bda32de316794f1c608898e17874857e2263ee1f3a5932440c630366cc40af2"
|
||||||
|
|
||||||
[[metadata.targets]]
|
[[metadata.targets]]
|
||||||
requires_python = ">=3.10,<3.14"
|
requires_python = ">=3.10,<3.14"
|
||||||
@@ -1324,6 +1324,17 @@ files = [
|
|||||||
{file = "ruyaml-0.91.0.tar.gz", hash = "sha256:6ce9de9f4d082d696d3bde264664d1bcdca8f5a9dff9d1a1f1a127969ab871ab"},
|
{file = "ruyaml-0.91.0.tar.gz", hash = "sha256:6ce9de9f4d082d696d3bde264664d1bcdca8f5a9dff9d1a1f1a127969ab871ab"},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sanitize-filename"
|
||||||
|
version = "1.2.0"
|
||||||
|
requires_python = "~=3.7"
|
||||||
|
summary = "A permissive filename sanitizer."
|
||||||
|
groups = ["default"]
|
||||||
|
files = [
|
||||||
|
{file = "sanitize_filename-1.2.0-py3-none-any.whl", hash = "sha256:a5be41a4371c84cb4a666a9c3baa70e1b2086a3e50b86c7ba5dd579f5ad2f330"},
|
||||||
|
{file = "sanitize_filename-1.2.0.tar.gz", hash = "sha256:e75933e96d426e306eef8c270cc24c3e1971d8715288c9776d801d3d8e7b941a"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "setuptools"
|
name = "setuptools"
|
||||||
version = "80.9.0"
|
version = "80.9.0"
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ dependencies = [
|
|||||||
"ruamel.yaml",
|
"ruamel.yaml",
|
||||||
"psutil",
|
"psutil",
|
||||||
"wcmatch",
|
"wcmatch",
|
||||||
|
"sanitize-filename>=1.2.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[dependency-groups] # https://peps.python.org/pep-0735/
|
[dependency-groups] # https://peps.python.org/pep-0735/
|
||||||
|
|||||||
@@ -291,6 +291,20 @@
|
|||||||
},
|
},
|
||||||
"title": "Excluded Shipping Options",
|
"title": "Excluded Shipping Options",
|
||||||
"type": "array"
|
"type": "array"
|
||||||
|
},
|
||||||
|
"folder_name_max_length": {
|
||||||
|
"default": 100,
|
||||||
|
"description": "maximum length for folder names when downloading ads (default: 100)",
|
||||||
|
"maximum": 255,
|
||||||
|
"minimum": 10,
|
||||||
|
"title": "Folder Name Max Length",
|
||||||
|
"type": "integer"
|
||||||
|
},
|
||||||
|
"rename_existing_folders": {
|
||||||
|
"default": false,
|
||||||
|
"description": "if true, rename existing folders without titles to include titles (default: false)",
|
||||||
|
"title": "Rename Existing Folders",
|
||||||
|
"type": "boolean"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"title": "DownloadConfig",
|
"title": "DownloadConfig",
|
||||||
@@ -346,7 +360,7 @@
|
|||||||
"type": "object"
|
"type": "object"
|
||||||
},
|
},
|
||||||
"UpdateCheckConfig": {
|
"UpdateCheckConfig": {
|
||||||
"description": "Configuration for update checking functionality.",
|
"description": "Configuration for update checking functionality.\n\nAttributes:\n enabled: Whether update checking is enabled.\n channel: Which release channel to check ('latest' for stable, 'preview' for prereleases).\n interval: How often to check for updates (e.g. '7d', '1d').\n If the interval is invalid, too short (<1d), or too long (>30d),\n the bot will log a warning and use a default interval for this run:\n - 1d for 'preview' channel\n - 7d for 'latest' channel\n The config file is not changed automatically; please fix your config to avoid repeated warnings.",
|
||||||
"properties": {
|
"properties": {
|
||||||
"enabled": {
|
"enabled": {
|
||||||
"default": true,
|
"default": true,
|
||||||
|
|||||||
@@ -45,15 +45,35 @@ class AdExtractor(WebScrapingMixin):
|
|||||||
os.mkdir(relative_directory)
|
os.mkdir(relative_directory)
|
||||||
LOG.info("Created ads directory at ./%s.", relative_directory)
|
LOG.info("Created ads directory at ./%s.", relative_directory)
|
||||||
|
|
||||||
new_base_dir = os.path.join(relative_directory, f"ad_{ad_id}")
|
# First, extract ad info to get the title
|
||||||
|
temp_dir = os.path.join(relative_directory, f"ad_{ad_id}")
|
||||||
|
ad_cfg:AdPartial = await self._extract_ad_page_info(temp_dir, ad_id)
|
||||||
|
|
||||||
|
# Create folder name with ad title
|
||||||
|
sanitized_title = misc.sanitize_folder_name(ad_cfg.title, self.config.download.folder_name_max_length)
|
||||||
|
new_base_dir = os.path.join(relative_directory, f"ad_{ad_id}_{sanitized_title}")
|
||||||
|
|
||||||
|
# If the folder with title already exists, delete it
|
||||||
if os.path.exists(new_base_dir):
|
if os.path.exists(new_base_dir):
|
||||||
LOG.info("Deleting current folder of ad %s...", ad_id)
|
LOG.info("Deleting current folder of ad %s...", ad_id)
|
||||||
shutil.rmtree(new_base_dir)
|
shutil.rmtree(new_base_dir)
|
||||||
os.mkdir(new_base_dir)
|
|
||||||
LOG.info("New directory for ad created at %s.", new_base_dir)
|
|
||||||
|
|
||||||
# call extraction function
|
# If the old folder without title exists, handle based on configuration
|
||||||
ad_cfg:AdPartial = await self._extract_ad_page_info(new_base_dir, ad_id)
|
if os.path.exists(temp_dir):
|
||||||
|
if self.config.download.rename_existing_folders:
|
||||||
|
LOG.info("Renaming folder from %s to %s for ad %s...",
|
||||||
|
os.path.basename(temp_dir), os.path.basename(new_base_dir), ad_id)
|
||||||
|
os.rename(temp_dir, new_base_dir)
|
||||||
|
else:
|
||||||
|
# Use the existing folder without renaming
|
||||||
|
new_base_dir = temp_dir
|
||||||
|
LOG.info("Using existing folder for ad %s at %s.", ad_id, new_base_dir)
|
||||||
|
else:
|
||||||
|
# Create new directory with title
|
||||||
|
os.mkdir(new_base_dir)
|
||||||
|
LOG.info("New directory for ad created at %s.", new_base_dir)
|
||||||
|
|
||||||
|
# Save the ad configuration file
|
||||||
ad_file_path = new_base_dir + "/" + f"ad_{ad_id}.yaml"
|
ad_file_path = new_base_dir + "/" + f"ad_{ad_id}.yaml"
|
||||||
dicts.save_dict(
|
dicts.save_dict(
|
||||||
ad_file_path,
|
ad_file_path,
|
||||||
|
|||||||
@@ -66,6 +66,16 @@ class DownloadConfig(ContextualModel):
|
|||||||
default_factory = list,
|
default_factory = list,
|
||||||
description = "list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']"
|
description = "list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']"
|
||||||
)
|
)
|
||||||
|
folder_name_max_length:int = Field(
|
||||||
|
default = 100,
|
||||||
|
ge = 10,
|
||||||
|
le = 255,
|
||||||
|
description = "maximum length for folder names when downloading ads (default: 100)"
|
||||||
|
)
|
||||||
|
rename_existing_folders:bool = Field(
|
||||||
|
default = False,
|
||||||
|
description = "if true, rename existing folders without titles to include titles (default: false)"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class BrowserConfig(ContextualModel):
|
class BrowserConfig(ContextualModel):
|
||||||
|
|||||||
@@ -176,6 +176,8 @@ kleinanzeigen_bot/extract.py:
|
|||||||
"Created ads directory at ./%s.": "Verzeichnis für Anzeigen erstellt unter ./%s."
|
"Created ads directory at ./%s.": "Verzeichnis für Anzeigen erstellt unter ./%s."
|
||||||
"Deleting current folder of ad %s...": "Lösche aktuellen Ordner der Anzeige %s..."
|
"Deleting current folder of ad %s...": "Lösche aktuellen Ordner der Anzeige %s..."
|
||||||
"New directory for ad created at %s.": "Neues Verzeichnis für Anzeige erstellt unter %s."
|
"New directory for ad created at %s.": "Neues Verzeichnis für Anzeige erstellt unter %s."
|
||||||
|
"Renaming folder from %s to %s for ad %s...": "Benenne Ordner von %s zu %s für Anzeige %s um..."
|
||||||
|
"Using existing folder for ad %s at %s.": "Verwende bestehenden Ordner für Anzeige %s unter %s."
|
||||||
|
|
||||||
_download_images_from_ad_page:
|
_download_images_from_ad_page:
|
||||||
"Found %s.": "%s gefunden."
|
"Found %s.": "%s gefunden."
|
||||||
|
|||||||
@@ -2,11 +2,14 @@
|
|||||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||||
import asyncio, decimal, re, sys, time # isort: skip
|
import asyncio, decimal, re, sys, time # isort: skip
|
||||||
|
import unicodedata
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from gettext import gettext as _
|
from gettext import gettext as _
|
||||||
from typing import Any, Mapping, TypeVar
|
from typing import Any, Mapping, TypeVar
|
||||||
|
|
||||||
|
from sanitize_filename import sanitize
|
||||||
|
|
||||||
from . import i18n
|
from . import i18n
|
||||||
|
|
||||||
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
|
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
|
||||||
@@ -263,3 +266,36 @@ def format_timedelta(td:timedelta) -> str:
|
|||||||
parts.append(i18n.pluralize("second", seconds))
|
parts.append(i18n.pluralize("second", seconds))
|
||||||
|
|
||||||
return ", ".join(parts) if parts else i18n.pluralize("second", 0)
|
return ", ".join(parts) if parts else i18n.pluralize("second", 0)
|
||||||
|
|
||||||
|
|
||||||
|
def sanitize_folder_name(name:str, max_length:int = 100) -> str:
|
||||||
|
"""
|
||||||
|
Sanitize a string for use as a folder name using `sanitize-filename`.
|
||||||
|
|
||||||
|
- Cross-platform safe (Windows/macOS/Linux)
|
||||||
|
- Removes invalid characters and Windows reserved names
|
||||||
|
- Handles path traversal attempts
|
||||||
|
- Truncates to `max_length`
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: The input string.
|
||||||
|
max_length: Maximum length of the resulting folder name (default: 100).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A sanitized folder name (falls back to "untitled" when empty).
|
||||||
|
"""
|
||||||
|
# Normalize whitespace and handle empty input
|
||||||
|
raw = (name or "").strip()
|
||||||
|
if not raw:
|
||||||
|
return "untitled"
|
||||||
|
|
||||||
|
raw = unicodedata.normalize("NFC", raw)
|
||||||
|
safe:str = sanitize(raw)
|
||||||
|
|
||||||
|
# Truncate with word-boundary preference
|
||||||
|
if len(safe) > max_length:
|
||||||
|
truncated = safe[:max_length]
|
||||||
|
last_break = max(truncated.rfind(" "), truncated.rfind("_"))
|
||||||
|
safe = truncated[:last_break] if last_break > int(max_length * 0.7) else truncated
|
||||||
|
|
||||||
|
return safe
|
||||||
|
|||||||
@@ -732,16 +732,17 @@ class TestAdExtractorDownload:
|
|||||||
patch("os.path.isdir") as mock_isdir, \
|
patch("os.path.isdir") as mock_isdir, \
|
||||||
patch("os.makedirs") as mock_makedirs, \
|
patch("os.makedirs") as mock_makedirs, \
|
||||||
patch("os.mkdir") as mock_mkdir, \
|
patch("os.mkdir") as mock_mkdir, \
|
||||||
|
patch("os.rename") as mock_rename, \
|
||||||
patch("shutil.rmtree") as mock_rmtree, \
|
patch("shutil.rmtree") as mock_rmtree, \
|
||||||
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
||||||
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
||||||
|
|
||||||
base_dir = "downloaded-ads"
|
base_dir = "downloaded-ads"
|
||||||
ad_dir = os.path.join(base_dir, "ad_12345")
|
final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title")
|
||||||
yaml_path = os.path.join(ad_dir, "ad_12345.yaml")
|
yaml_path = os.path.join(final_dir, "ad_12345.yaml")
|
||||||
|
|
||||||
# Configure mocks for directory checks
|
# Configure mocks for directory checks
|
||||||
existing_paths = {base_dir, ad_dir}
|
existing_paths = {base_dir, final_dir} # Final directory with title exists
|
||||||
mock_exists.side_effect = lambda path: path in existing_paths
|
mock_exists.side_effect = lambda path: path in existing_paths
|
||||||
mock_isdir.side_effect = lambda path: path == base_dir
|
mock_isdir.side_effect = lambda path: path == base_dir
|
||||||
|
|
||||||
@@ -763,12 +764,12 @@ class TestAdExtractorDownload:
|
|||||||
|
|
||||||
# Verify the correct functions were called
|
# Verify the correct functions were called
|
||||||
mock_extract.assert_called_once()
|
mock_extract.assert_called_once()
|
||||||
mock_rmtree.assert_called_once_with(ad_dir)
|
mock_rmtree.assert_called_once_with(final_dir) # Delete the final directory with title
|
||||||
mock_mkdir.assert_called_once_with(ad_dir)
|
mock_mkdir.assert_called_once_with(final_dir) # Create the final directory with title
|
||||||
mock_makedirs.assert_not_called() # Directory already exists
|
mock_makedirs.assert_not_called() # Directory already exists
|
||||||
|
mock_rename.assert_not_called() # No renaming needed
|
||||||
|
|
||||||
# Get the actual call arguments
|
# Get the actual call arguments
|
||||||
# Workaround for hard-coded path in download_ad
|
|
||||||
actual_call = mock_save_dict.call_args
|
actual_call = mock_save_dict.call_args
|
||||||
assert actual_call is not None
|
assert actual_call is not None
|
||||||
actual_path = actual_call[0][0].replace("/", os.path.sep)
|
actual_path = actual_call[0][0].replace("/", os.path.sep)
|
||||||
@@ -790,13 +791,14 @@ class TestAdExtractorDownload:
|
|||||||
patch("os.path.isdir") as mock_isdir, \
|
patch("os.path.isdir") as mock_isdir, \
|
||||||
patch("os.makedirs") as mock_makedirs, \
|
patch("os.makedirs") as mock_makedirs, \
|
||||||
patch("os.mkdir") as mock_mkdir, \
|
patch("os.mkdir") as mock_mkdir, \
|
||||||
|
patch("os.rename") as mock_rename, \
|
||||||
patch("shutil.rmtree") as mock_rmtree, \
|
patch("shutil.rmtree") as mock_rmtree, \
|
||||||
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
||||||
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
||||||
|
|
||||||
base_dir = "downloaded-ads"
|
base_dir = "downloaded-ads"
|
||||||
ad_dir = os.path.join(base_dir, "ad_12345")
|
final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title")
|
||||||
yaml_path = os.path.join(ad_dir, "ad_12345.yaml")
|
yaml_path = os.path.join(final_dir, "ad_12345.yaml")
|
||||||
|
|
||||||
# Configure mocks for directory checks
|
# Configure mocks for directory checks
|
||||||
mock_exists.return_value = False
|
mock_exists.return_value = False
|
||||||
@@ -823,9 +825,118 @@ class TestAdExtractorDownload:
|
|||||||
mock_rmtree.assert_not_called() # No directory to remove
|
mock_rmtree.assert_not_called() # No directory to remove
|
||||||
mock_mkdir.assert_has_calls([
|
mock_mkdir.assert_has_calls([
|
||||||
call(base_dir),
|
call(base_dir),
|
||||||
call(ad_dir)
|
call(final_dir) # Create the final directory with title
|
||||||
])
|
])
|
||||||
mock_makedirs.assert_not_called() # Using mkdir instead
|
mock_makedirs.assert_not_called() # Using mkdir instead
|
||||||
|
mock_rename.assert_not_called() # No renaming needed
|
||||||
|
|
||||||
|
# Get the actual call arguments
|
||||||
|
actual_call = mock_save_dict.call_args
|
||||||
|
assert actual_call is not None
|
||||||
|
actual_path = actual_call[0][0].replace("/", os.path.sep)
|
||||||
|
assert actual_path == yaml_path
|
||||||
|
assert actual_call[0][1] == mock_extract.return_value.model_dump()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_download_ad_use_existing_folder(self, extractor:AdExtractor) -> None:
|
||||||
|
"""Test downloading an ad when an old folder without title exists (default behavior)."""
|
||||||
|
with patch("os.path.exists") as mock_exists, \
|
||||||
|
patch("os.path.isdir") as mock_isdir, \
|
||||||
|
patch("os.makedirs") as mock_makedirs, \
|
||||||
|
patch("os.mkdir") as mock_mkdir, \
|
||||||
|
patch("os.rename") as mock_rename, \
|
||||||
|
patch("shutil.rmtree") as mock_rmtree, \
|
||||||
|
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
||||||
|
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
||||||
|
|
||||||
|
base_dir = "downloaded-ads"
|
||||||
|
temp_dir = os.path.join(base_dir, "ad_12345")
|
||||||
|
yaml_path = os.path.join(temp_dir, "ad_12345.yaml")
|
||||||
|
|
||||||
|
# Configure mocks for directory checks
|
||||||
|
# Base directory exists, temp directory exists
|
||||||
|
existing_paths = {base_dir, temp_dir}
|
||||||
|
mock_exists.side_effect = lambda path: path in existing_paths
|
||||||
|
mock_isdir.side_effect = lambda path: path == base_dir
|
||||||
|
|
||||||
|
mock_extract.return_value = AdPartial.model_validate({
|
||||||
|
"title": "Test Advertisement Title",
|
||||||
|
"description": "Test Description",
|
||||||
|
"category": "Dienstleistungen",
|
||||||
|
"price": 100,
|
||||||
|
"images": [],
|
||||||
|
"contact": {
|
||||||
|
"name": "Test User",
|
||||||
|
"street": "Test Street 123",
|
||||||
|
"zipcode": "12345",
|
||||||
|
"location": "Test City"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
await extractor.download_ad(12345)
|
||||||
|
|
||||||
|
# Verify the correct functions were called
|
||||||
|
mock_extract.assert_called_once()
|
||||||
|
mock_rmtree.assert_not_called() # No directory to remove
|
||||||
|
mock_mkdir.assert_not_called() # Base directory already exists
|
||||||
|
mock_makedirs.assert_not_called() # Using mkdir instead
|
||||||
|
mock_rename.assert_not_called() # No renaming (default behavior)
|
||||||
|
|
||||||
|
# Get the actual call arguments
|
||||||
|
actual_call = mock_save_dict.call_args
|
||||||
|
assert actual_call is not None
|
||||||
|
actual_path = actual_call[0][0].replace("/", os.path.sep)
|
||||||
|
assert actual_path == yaml_path
|
||||||
|
assert actual_call[0][1] == mock_extract.return_value.model_dump()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_download_ad_rename_existing_folder_when_enabled(self, extractor:AdExtractor) -> None:
|
||||||
|
"""Test downloading an ad when an old folder without title exists and renaming is enabled."""
|
||||||
|
# Enable renaming in config
|
||||||
|
extractor.config.download.rename_existing_folders = True
|
||||||
|
|
||||||
|
with patch("os.path.exists") as mock_exists, \
|
||||||
|
patch("os.path.isdir") as mock_isdir, \
|
||||||
|
patch("os.makedirs") as mock_makedirs, \
|
||||||
|
patch("os.mkdir") as mock_mkdir, \
|
||||||
|
patch("os.rename") as mock_rename, \
|
||||||
|
patch("shutil.rmtree") as mock_rmtree, \
|
||||||
|
patch("kleinanzeigen_bot.extract.dicts.save_dict", autospec = True) as mock_save_dict, \
|
||||||
|
patch.object(extractor, "_extract_ad_page_info", new_callable = AsyncMock) as mock_extract:
|
||||||
|
|
||||||
|
base_dir = "downloaded-ads"
|
||||||
|
temp_dir = os.path.join(base_dir, "ad_12345")
|
||||||
|
final_dir = os.path.join(base_dir, "ad_12345_Test Advertisement Title")
|
||||||
|
yaml_path = os.path.join(final_dir, "ad_12345.yaml")
|
||||||
|
|
||||||
|
# Configure mocks for directory checks
|
||||||
|
# Base directory exists, temp directory exists, final directory doesn't exist
|
||||||
|
existing_paths = {base_dir, temp_dir}
|
||||||
|
mock_exists.side_effect = lambda path: path in existing_paths
|
||||||
|
mock_isdir.side_effect = lambda path: path == base_dir
|
||||||
|
|
||||||
|
mock_extract.return_value = AdPartial.model_validate({
|
||||||
|
"title": "Test Advertisement Title",
|
||||||
|
"description": "Test Description",
|
||||||
|
"category": "Dienstleistungen",
|
||||||
|
"price": 100,
|
||||||
|
"images": [],
|
||||||
|
"contact": {
|
||||||
|
"name": "Test User",
|
||||||
|
"street": "Test Street 123",
|
||||||
|
"zipcode": "12345",
|
||||||
|
"location": "Test City"
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
await extractor.download_ad(12345)
|
||||||
|
|
||||||
|
# Verify the correct functions were called
|
||||||
|
mock_extract.assert_called_once()
|
||||||
|
mock_rmtree.assert_not_called() # No directory to remove
|
||||||
|
mock_mkdir.assert_not_called() # Base directory already exists
|
||||||
|
mock_makedirs.assert_not_called() # Using mkdir instead
|
||||||
|
mock_rename.assert_called_once_with(temp_dir, final_dir) # Rename temp to final
|
||||||
|
|
||||||
# Get the actual call arguments
|
# Get the actual call arguments
|
||||||
actual_call = mock_save_dict.call_args
|
actual_call = mock_save_dict.call_args
|
||||||
|
|||||||
@@ -234,13 +234,13 @@ class TestUpdateChecker:
|
|||||||
|
|
||||||
def test_update_check_state_invalid_data(self, state_file:Path) -> None:
|
def test_update_check_state_invalid_data(self, state_file:Path) -> None:
|
||||||
"""Test that loading invalid state data returns a new state."""
|
"""Test that loading invalid state data returns a new state."""
|
||||||
state_file.write_text("invalid json")
|
state_file.write_text("invalid json", encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.last_check is None
|
assert state.last_check is None
|
||||||
|
|
||||||
def test_update_check_state_missing_last_check(self, state_file:Path) -> None:
|
def test_update_check_state_missing_last_check(self, state_file:Path) -> None:
|
||||||
"""Test that loading state data without last_check returns a new state."""
|
"""Test that loading state data without last_check returns a new state."""
|
||||||
state_file.write_text("{}")
|
state_file.write_text("{}", encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.last_check is None
|
assert state.last_check is None
|
||||||
|
|
||||||
@@ -371,7 +371,7 @@ class TestUpdateChecker:
|
|||||||
|
|
||||||
def test_update_check_state_invalid_date(self, state_file:Path) -> None:
|
def test_update_check_state_invalid_date(self, state_file:Path) -> None:
|
||||||
"""Test that loading a state file with an invalid date string for last_check returns a new state (triggers ValueError)."""
|
"""Test that loading a state file with an invalid date string for last_check returns a new state (triggers ValueError)."""
|
||||||
state_file.write_text(json.dumps({"last_check": "not-a-date"}))
|
state_file.write_text(json.dumps({"last_check": "not-a-date"}), encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.last_check is None
|
assert state.last_check is None
|
||||||
|
|
||||||
@@ -471,7 +471,7 @@ class TestUpdateChecker:
|
|||||||
# Create a state with version 0 (old format)
|
# Create a state with version 0 (old format)
|
||||||
state_file.write_text(json.dumps({
|
state_file.write_text(json.dumps({
|
||||||
"last_check": datetime.now(timezone.utc).isoformat()
|
"last_check": datetime.now(timezone.utc).isoformat()
|
||||||
}))
|
}), encoding = "utf-8")
|
||||||
|
|
||||||
# Load the state - should migrate to version 1
|
# Load the state - should migrate to version 1
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
@@ -490,7 +490,7 @@ class TestUpdateChecker:
|
|||||||
old_time = datetime.now(timezone.utc)
|
old_time = datetime.now(timezone.utc)
|
||||||
state_file.write_text(json.dumps({
|
state_file.write_text(json.dumps({
|
||||||
"last_check": old_time.isoformat()
|
"last_check": old_time.isoformat()
|
||||||
}))
|
}), encoding = "utf-8")
|
||||||
|
|
||||||
# Load the state - should migrate to version 1
|
# Load the state - should migrate to version 1
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
@@ -522,7 +522,7 @@ class TestUpdateChecker:
|
|||||||
def test_update_check_state_load_errors(self, state_file:Path) -> None:
|
def test_update_check_state_load_errors(self, state_file:Path) -> None:
|
||||||
"""Test that load errors are handled gracefully."""
|
"""Test that load errors are handled gracefully."""
|
||||||
# Test invalid JSON
|
# Test invalid JSON
|
||||||
state_file.write_text("invalid json")
|
state_file.write_text("invalid json", encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.version == 1
|
assert state.version == 1
|
||||||
assert state.last_check is None
|
assert state.last_check is None
|
||||||
@@ -531,7 +531,7 @@ class TestUpdateChecker:
|
|||||||
state_file.write_text(json.dumps({
|
state_file.write_text(json.dumps({
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"last_check": "invalid-date"
|
"last_check": "invalid-date"
|
||||||
}))
|
}), encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.version == 1
|
assert state.version == 1
|
||||||
assert state.last_check is None
|
assert state.last_check is None
|
||||||
@@ -542,7 +542,7 @@ class TestUpdateChecker:
|
|||||||
state_file.write_text(json.dumps({
|
state_file.write_text(json.dumps({
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"last_check": "2024-03-20T12:00:00"
|
"last_check": "2024-03-20T12:00:00"
|
||||||
}))
|
}), encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.last_check is not None
|
assert state.last_check is not None
|
||||||
assert state.last_check.tzinfo == timezone.utc
|
assert state.last_check.tzinfo == timezone.utc
|
||||||
@@ -552,7 +552,7 @@ class TestUpdateChecker:
|
|||||||
state_file.write_text(json.dumps({
|
state_file.write_text(json.dumps({
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"last_check": "2024-03-20T12:00:00+02:00" # 2 hours ahead of UTC
|
"last_check": "2024-03-20T12:00:00+02:00" # 2 hours ahead of UTC
|
||||||
}))
|
}), encoding = "utf-8")
|
||||||
state = UpdateCheckState.load(state_file)
|
state = UpdateCheckState.load(state_file)
|
||||||
assert state.last_check is not None
|
assert state.last_check is not None
|
||||||
assert state.last_check.tzinfo == timezone.utc
|
assert state.last_check.tzinfo == timezone.utc
|
||||||
|
|||||||
@@ -7,8 +7,10 @@ import sys
|
|||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from sanitize_filename import sanitize
|
||||||
|
|
||||||
from kleinanzeigen_bot.utils import misc
|
from kleinanzeigen_bot.utils import misc
|
||||||
|
from kleinanzeigen_bot.utils.misc import sanitize_folder_name
|
||||||
|
|
||||||
|
|
||||||
def test_now_returns_utc_datetime() -> None:
|
def test_now_returns_utc_datetime() -> None:
|
||||||
@@ -133,3 +135,133 @@ def test_ensure_non_callable_truthy_and_falsy() -> None:
|
|||||||
misc.ensure("", "Should fail for empty string")
|
misc.ensure("", "Should fail for empty string")
|
||||||
with pytest.raises(AssertionError):
|
with pytest.raises(AssertionError):
|
||||||
misc.ensure(None, "Should fail for None")
|
misc.ensure(None, "Should fail for None")
|
||||||
|
|
||||||
|
|
||||||
|
# --- Test sanitize_folder_name function ---
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("test_input", "expected_output", "description"),
|
||||||
|
[
|
||||||
|
# Basic sanitization
|
||||||
|
("My Ad Title!", "My Ad Title!", "Basic sanitization"),
|
||||||
|
|
||||||
|
# Unicode normalization (sanitize-filename changes normalization)
|
||||||
|
("café", "cafe\u0301", "Unicode normalization"),
|
||||||
|
("caf\u00e9", "cafe\u0301", "Unicode normalization from escaped"),
|
||||||
|
|
||||||
|
# Edge cases
|
||||||
|
("", "untitled", "Empty string"),
|
||||||
|
(" ", "untitled", "Whitespace only"),
|
||||||
|
("___", "___", "Multiple underscores (not collapsed)"),
|
||||||
|
|
||||||
|
# Control characters (removed by sanitize-filename)
|
||||||
|
("Ad\x00with\x1fcontrol", "Adwithcontrol", "Control characters removed"),
|
||||||
|
|
||||||
|
# Multiple consecutive underscores (sanitize-filename doesn't collapse them)
|
||||||
|
("Ad___with___multiple___underscores", "Ad___with___multiple___underscores", "Multiple underscores preserved"),
|
||||||
|
|
||||||
|
# Special characters (removed by sanitize-filename)
|
||||||
|
('file<with>invalid:chars"|?*', "filewithinvalidchars", "Special characters removed"),
|
||||||
|
("file\\with\\backslashes", "filewithbackslashes", "Backslashes removed"),
|
||||||
|
("file/with/slashes", "filewithslashes", "Forward slashes removed"),
|
||||||
|
|
||||||
|
# Path traversal attempts (handled by sanitize-filename)
|
||||||
|
("Title with ../../etc/passwd", "Title with ....etcpasswd", "Path traversal attempt"),
|
||||||
|
("Title with C:\\Windows\\System32\\cmd.exe", "Title with CWindowsSystem32cmd.exe", "Windows path traversal"),
|
||||||
|
|
||||||
|
# XSS attempts (handled by sanitize-filename)
|
||||||
|
('Title with <script>alert("xss")</script>', "Title with scriptalert(xss)script", "XSS attempt"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sanitize_folder_name_basic(test_input:str, expected_output:str, description:str) -> None:
|
||||||
|
"""Test sanitize_folder_name function with various inputs."""
|
||||||
|
result = sanitize_folder_name(test_input)
|
||||||
|
assert result == expected_output, f"Failed for '{test_input}': {description}"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("test_input", "max_length", "expected_output", "description"),
|
||||||
|
[
|
||||||
|
# Length truncation
|
||||||
|
("Very long advertisement title that exceeds the maximum length and should be truncated", 50,
|
||||||
|
"Very long advertisement title that exceeds the", "Length truncation"),
|
||||||
|
|
||||||
|
# Word boundary truncation
|
||||||
|
("Short words but very long title", 20, "Short words but", "Word boundary truncation"),
|
||||||
|
|
||||||
|
# Edge case: no word boundary found
|
||||||
|
("VeryLongWordWithoutSpaces", 10, "VeryLongWo", "No word boundary truncation"),
|
||||||
|
|
||||||
|
# Test default max_length (100)
|
||||||
|
("This is a reasonable advertisement title that fits within the default limit", 100,
|
||||||
|
"This is a reasonable advertisement title that fits within the default limit", "Default max_length"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sanitize_folder_name_truncation(test_input:str, max_length:int, expected_output:str, description:str) -> None:
|
||||||
|
"""Test sanitize_folder_name function with length truncation."""
|
||||||
|
result = sanitize_folder_name(test_input, max_length = max_length)
|
||||||
|
assert len(result) <= max_length, f"Result exceeds max_length for '{test_input}': {description}"
|
||||||
|
assert result == expected_output, f"Failed for '{test_input}' with max_length={max_length}: {description}"
|
||||||
|
|
||||||
|
|
||||||
|
# --- Test sanitize-filename behavior directly (since it's consistent across platforms) ---
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
("test_input", "expected_output"),
|
||||||
|
[
|
||||||
|
# Test sanitize-filename behavior (consistent across platforms)
|
||||||
|
("test/file", "testfile"),
|
||||||
|
("test\\file", "testfile"),
|
||||||
|
("test<file", "testfile"),
|
||||||
|
("test>file", "testfile"),
|
||||||
|
('test"file', "testfile"),
|
||||||
|
("test|file", "testfile"),
|
||||||
|
("test?file", "testfile"),
|
||||||
|
("test*file", "testfile"),
|
||||||
|
("test:file", "testfile"),
|
||||||
|
("CON", "__CON"),
|
||||||
|
("PRN", "__PRN"),
|
||||||
|
("AUX", "__AUX"),
|
||||||
|
("NUL", "__NUL"),
|
||||||
|
("COM1", "__COM1"),
|
||||||
|
("LPT1", "__LPT1"),
|
||||||
|
("file/with/slashes", "filewithslashes"),
|
||||||
|
("file\\with\\backslashes", "filewithbackslashes"),
|
||||||
|
('file<with>invalid:chars"|?*', "filewithinvalidchars"),
|
||||||
|
("file\x00with\x1fcontrol", "filewithcontrol"),
|
||||||
|
("file___with___underscores", "file___with___underscores"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sanitize_filename_behavior(test_input:str, expected_output:str) -> None:
|
||||||
|
"""Test sanitize-filename behavior directly (consistent across platforms)."""
|
||||||
|
result = sanitize(test_input)
|
||||||
|
assert result == expected_output, f"sanitize-filename behavior mismatch for '{test_input}'"
|
||||||
|
|
||||||
|
|
||||||
|
# --- Test sanitize_folder_name cross-platform consistency ---
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"test_input",
|
||||||
|
[
|
||||||
|
"normal_filename",
|
||||||
|
"filename with spaces",
|
||||||
|
"filename_with_underscores",
|
||||||
|
"filename-with-dashes",
|
||||||
|
"filename.with.dots",
|
||||||
|
"filename123",
|
||||||
|
"café_filename",
|
||||||
|
"filename\x00with\x1fcontrol", # Control characters
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_sanitize_folder_name_cross_platform_consistency(
|
||||||
|
monkeypatch:pytest.MonkeyPatch,
|
||||||
|
test_input:str
|
||||||
|
) -> None:
|
||||||
|
"""Test that sanitize_folder_name produces consistent results across platforms for safe inputs."""
|
||||||
|
platforms = ["Windows", "Darwin", "Linux"]
|
||||||
|
results = []
|
||||||
|
|
||||||
|
for platform in platforms:
|
||||||
|
monkeypatch.setattr("sys.platform", platform.lower())
|
||||||
|
result = sanitize_folder_name(test_input)
|
||||||
|
results.append(result)
|
||||||
|
|
||||||
|
# All platforms should produce the same result for safe inputs
|
||||||
|
assert len(set(results)) == 1, f"Cross-platform inconsistency for '{test_input}': {results}"
|
||||||
|
|||||||
Reference in New Issue
Block a user