diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index f6eee8c..9023e53 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -172,10 +172,11 @@ jobs:
set -eux
case "${{ matrix.os }}" in
- ubuntu-*)
- sudo apt-get install --no-install-recommends -y xvfb
- xvfb-run pdm run itest:cov -vv
- ;;
+ ubuntu-*)
+ sudo apt-get install --no-install-recommends -y xvfb
+ # Run tests INSIDE xvfb context
+ xvfb-run bash -c 'pdm run itest:cov -vv'
+ ;;
*) pdm run itest:cov -vv
;;
esac
diff --git a/README.md b/README.md
index 335b38a..d763a37 100644
--- a/README.md
+++ b/README.md
@@ -248,6 +248,25 @@ Limitation of `download`: It's only possible to extract the cheapest given shipp
All configuration files can be in YAML or JSON format.
+### Installation modes (portable vs. system-wide)
+
+On first run, the app may ask which installation mode to use. In non-interactive environments (CI/headless), it defaults to portable mode and will not prompt; `--config` and `--logfile` override only their specific paths, and do not change other mode-dependent paths or the chosen installation mode behavior.
+
+1. **Portable mode (recommended for most users, especially on Windows):**
+ - Stores config, logs, downloads, and state in the current directory
+ - No admin permissions required
+ - Easy backup/migration; works from USB drives
+
+2. **System-wide mode (advanced users / multi-user setups):**
+ - Stores files in OS-standard locations
+ - Cleaner directory structure; better separation from working directory
+ - Requires proper permissions for user data directories
+
+**OS notes (brief):**
+- **Windows:** System-wide uses AppData (Roaming/Local); portable keeps everything beside the `.exe`.
+- **Linux:** System-wide follows XDG Base Directory spec; portable stays in the current working directory.
+- **macOS:** System-wide uses `~/Library/Application Support/kleinanzeigen-bot` (and related dirs); portable stays in the current directory.
+
### 1) Main configuration
When executing the app it by default looks for a `config.yaml` file in the current directory. If it does not exist it will be created automatically.
diff --git a/pyproject.toml b/pyproject.toml
index ce78689..b2e3cf1 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -111,7 +111,8 @@ lint = { composite = ["lint:ruff", "lint:mypy", "lint:pyright"] }
# Run unit tests only (exclude smoke and itest)
utest = "python -m pytest --capture=tee-sys -m \"not itest and not smoke\""
# Run integration tests only (exclude smoke)
-itest = "python -m pytest --capture=tee-sys -m \"itest and not smoke\""
+# Uses -n 0 to disable xdist parallelization - browser tests are flaky with parallel workers
+itest = "python -m pytest --capture=tee-sys -m \"itest and not smoke\" -n 0"
# Run smoke tests only
smoke = "python -m pytest --capture=tee-sys -m smoke"
# Run all tests in order: unit, integration, smoke
@@ -126,7 +127,7 @@ test = { composite = ["utest", "itest", "smoke"] }
"coverage:prepare" = { shell = "python scripts/coverage_helper.py prepare" }
"test:cov" = { composite = ["coverage:prepare", "utest:cov", "itest:cov", "smoke:cov", "coverage:combine"] }
"utest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-unit.sqlite .temp/coverage-unit.xml \"not itest and not smoke\"" }
-"itest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-itest.sqlite .temp/coverage-integration.xml \"itest and not smoke\"" }
+"itest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-itest.sqlite .temp/coverage-integration.xml \"itest and not smoke\" -n 0" }
"smoke:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-smoke.sqlite .temp/coverage-smoke.xml smoke" }
"coverage:combine" = { shell = "python scripts/coverage_helper.py combine .temp/.coverage-unit.sqlite .temp/.coverage-itest.sqlite .temp/.coverage-smoke.sqlite" }
# Run all tests with coverage in a single invocation
diff --git a/schemas/config.schema.json b/schemas/config.schema.json
index 08eb71f..e1be95a 100644
--- a/schemas/config.schema.json
+++ b/schemas/config.schema.json
@@ -185,7 +185,7 @@
"BrowserConfig": {
"properties": {
"arguments": {
- "description": "See https://peter.sh/experiments/chromium-command-line-switches/",
+ "description": "See https://peter.sh/experiments/chromium-command-line-switches/. Browser profile path is auto-configured based on installation mode (portable/XDG).",
"items": {
"type": "string"
},
@@ -227,8 +227,8 @@
"type": "null"
}
],
- "default": ".temp/browser-profile",
- "description": "See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md",
+ "default": null,
+ "description": "See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md. If not specified, defaults to XDG cache directory in XDG mode or .temp/browser-profile in portable mode.",
"title": "User Data Dir"
},
"profile_name": {
diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py
index 87134db..7819157 100644
--- a/src/kleinanzeigen_bot/__init__.py
+++ b/src/kleinanzeigen_bot/__init__.py
@@ -28,7 +28,7 @@ from .utils.web_scraping_mixin import By, Element, Is, WebScrapingMixin
# W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933
-LOG: Final[loggers.Logger] = loggers.get_logger(__name__)
+LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
LOG.setLevel(loggers.INFO)
colorama.just_fix_windows_console()
@@ -39,7 +39,7 @@ class AdUpdateStrategy(enum.Enum):
MODIFY = enum.auto()
-def _repost_cycle_ready(ad_cfg: Ad, ad_file_relative: str) -> bool:
+def _repost_cycle_ready(ad_cfg:Ad, ad_file_relative:str) -> bool:
"""
Check if the repost cycle delay has been satisfied.
@@ -72,7 +72,7 @@ def _repost_cycle_ready(ad_cfg: Ad, ad_file_relative: str) -> bool:
return True
-def _day_delay_elapsed(ad_cfg: Ad, ad_file_relative: str) -> bool:
+def _day_delay_elapsed(ad_cfg:Ad, ad_file_relative:str) -> bool:
"""
Check if the day delay has elapsed since the ad was last published.
@@ -100,7 +100,7 @@ def _day_delay_elapsed(ad_cfg: Ad, ad_file_relative: str) -> bool:
return True
-def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> None:
+def apply_auto_price_reduction(ad_cfg:Ad, _ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> None:
"""
Apply automatic price reduction to an ad based on repost count and configuration.
@@ -132,7 +132,7 @@ def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file
applied_cycles = ad_cfg.price_reduction_count or 0
next_cycle = applied_cycles + 1
- effective_price = calculate_auto_price(base_price=base_price, auto_price_reduction=ad_cfg.auto_price_reduction, target_reduction_cycle=next_cycle)
+ effective_price = calculate_auto_price(base_price = base_price, auto_price_reduction = ad_cfg.auto_price_reduction, target_reduction_cycle = next_cycle)
if effective_price is None:
return
@@ -149,7 +149,7 @@ def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file
# Note: price_reduction_count is persisted to ad_cfg_orig only after successful publish
-class KleinanzeigenBot(WebScrapingMixin):
+class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
def __init__(self) -> None:
# workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/295
# see https://github.com/pyinstaller/pyinstaller/issues/7229#issuecomment-1309383026
@@ -159,17 +159,17 @@ class KleinanzeigenBot(WebScrapingMixin):
self.root_url = "https://www.kleinanzeigen.de"
- self.config: Config
+ self.config:Config
self.config_file_path = abspath("config.yaml")
self.config_explicitly_provided = False
- self.installation_mode: xdg_paths.InstallationMode | None = None
+ self.installation_mode:xdg_paths.InstallationMode | None = None
- self.categories: dict[str, str] = {}
+ self.categories:dict[str, str] = {}
- self.file_log: loggers.LogFileHandle | None = None
+ self.file_log:loggers.LogFileHandle | None = None
log_file_basename = is_frozen() and os.path.splitext(os.path.basename(sys.executable))[0] or self.__module__
- self.log_file_path: str | None = abspath(f"{log_file_basename}.log")
+ self.log_file_path:str | None = abspath(f"{log_file_basename}.log")
self.log_file_basename = log_file_basename
self.log_file_explicitly_provided = False
@@ -245,7 +245,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info(_("Installation mode: %s"), mode_display)
LOG.info(_("Config file: %s"), self.config_file_path)
- async def run(self, args: list[str]) -> None:
+ async def run(self, args:list[str]) -> None:
self.parse_args(args)
self.finalize_installation_mode()
try:
@@ -277,7 +277,7 @@ class KleinanzeigenBot(WebScrapingMixin):
self.configure_file_logging()
self.load_config()
checker = UpdateChecker(self.config, self.installation_mode_or_portable)
- checker.check_for_updates(skip_interval_check=True)
+ checker.check_for_updates(skip_interval_check = True)
case "update-content-hash":
self.configure_file_logging()
self.load_config()
@@ -285,7 +285,7 @@ class KleinanzeigenBot(WebScrapingMixin):
checker = UpdateChecker(self.config, self.installation_mode_or_portable)
checker.check_for_updates()
self.ads_selector = "all"
- if ads := self.load_ads(exclude_ads_with_id=False):
+ if ads := self.load_ads(exclude_ads_with_id = False):
self.update_content_hashes(ads)
else:
LOG.info("############################################")
@@ -503,7 +503,7 @@ class KleinanzeigenBot(WebScrapingMixin):
)
)
- def parse_args(self, args: list[str]) -> None:
+ def parse_args(self, args:list[str]) -> None:
try:
options, arguments = getopt.gnu_getopt(args[1:], "hv", ["ads=", "config=", "force", "help", "keep-old", "logfile=", "lang=", "verbose"])
except getopt.error as ex:
@@ -571,7 +571,7 @@ class KleinanzeigenBot(WebScrapingMixin):
default_config.login.password = "changeme" # noqa: S105 placeholder for default config, not a real password
dicts.save_dict(
self.config_file_path,
- default_config.model_dump(exclude_none=True, exclude={"ad_defaults": {"description"}}),
+ default_config.model_dump(exclude_none = True, exclude = {"ad_defaults": {"description"}}),
header=(
"# yaml-language-server: $schema="
"https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot"
@@ -585,7 +585,7 @@ class KleinanzeigenBot(WebScrapingMixin):
self.create_default_config()
config_yaml = dicts.load_dict_if_exists(self.config_file_path, _("config"))
- self.config = Config.model_validate(config_yaml, strict=True, context=self.config_file_path)
+ self.config = Config.model_validate(config_yaml, strict = True, context = self.config_file_path)
# load built-in category mappings
self.categories = dicts.load_dict_from_module(resources, "categories.yaml", "categories")
@@ -598,13 +598,13 @@ class KleinanzeigenBot(WebScrapingMixin):
# populate browser_config object used by WebScrapingMixin
self.browser_config.arguments = self.config.browser.arguments
self.browser_config.binary_location = self.config.browser.binary_location
- self.browser_config.extensions = [abspath(item, relative_to=self.config_file_path) for item in self.config.browser.extensions]
+ self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config.browser.extensions]
self.browser_config.use_private_window = self.config.browser.use_private_window
if self.config.browser.user_data_dir:
- self.browser_config.user_data_dir = abspath(self.config.browser.user_data_dir, relative_to=self.config_file_path)
+ self.browser_config.user_data_dir = abspath(self.config.browser.user_data_dir, relative_to = self.config_file_path)
self.browser_config.profile_name = self.config.browser.profile_name
- def __check_ad_republication(self, ad_cfg: Ad, ad_file_relative: str) -> bool:
+ def __check_ad_republication(self, ad_cfg:Ad, ad_file_relative:str) -> bool:
"""
Check if an ad needs to be republished based on republication interval.
Note: This method does not check for content changes. Use __check_ad_changed for that.
@@ -635,7 +635,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return True
- def __check_ad_changed(self, ad_cfg: Ad, ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> bool:
+ def __check_ad_changed(self, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> bool:
"""
Check if an ad has been changed since last publication.
@@ -662,7 +662,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return False
- def load_ads(self, *, ignore_inactive: bool = True, exclude_ads_with_id: bool = True) -> list[tuple[str, Ad, dict[str, Any]]]:
+ def load_ads(self, *, ignore_inactive:bool = True, exclude_ads_with_id:bool = True) -> list[tuple[str, Ad, dict[str, Any]]]:
"""
Load and validate all ad config files, optionally filtering out inactive or already-published ads.
@@ -678,12 +678,12 @@ class KleinanzeigenBot(WebScrapingMixin):
"""
LOG.info("Searching for ad config files...")
- ad_files: dict[str, str] = {}
+ ad_files:dict[str, str] = {}
data_root_dir = os.path.dirname(self.config_file_path)
for file_pattern in self.config.ad_files:
- for ad_file in glob.glob(file_pattern, root_dir=data_root_dir, flags=glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
+ for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
if not str(ad_file).endswith("ad_fields.yaml"):
- ad_files[abspath(ad_file, relative_to=data_root_dir)] = ad_file
+ ad_files[abspath(ad_file, relative_to = data_root_dir)] = ad_file
LOG.info(" -> found %s", pluralize("ad config file", ad_files))
if not ad_files:
return []
@@ -700,8 +700,8 @@ class KleinanzeigenBot(WebScrapingMixin):
ads = []
for ad_file, ad_file_relative in sorted(ad_files.items()):
- ad_cfg_orig: dict[str, Any] = dicts.load_dict(ad_file, "ad")
- ad_cfg: Ad = self.load_ad(ad_cfg_orig)
+ ad_cfg_orig:dict[str, Any] = dicts.load_dict(ad_file, "ad")
+ ad_cfg:Ad = self.load_ad(ad_cfg_orig)
if ignore_inactive and not ad_cfg.active:
LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file_relative)
@@ -738,8 +738,8 @@ class KleinanzeigenBot(WebScrapingMixin):
if not should_include:
continue
- ensure(self.__get_description(ad_cfg, with_affixes=False), f"-> property [description] not specified @ [{ad_file}]")
- self.__get_description(ad_cfg, with_affixes=True) # validates complete description
+ ensure(self.__get_description(ad_cfg, with_affixes = False), f"-> property [description] not specified @ [{ad_file}]")
+ self.__get_description(ad_cfg, with_affixes = True) # validates complete description
if ad_cfg.category:
resolved_category_id = self.categories.get(ad_cfg.category)
@@ -758,13 +758,13 @@ class KleinanzeigenBot(WebScrapingMixin):
ad_dir = os.path.dirname(ad_file)
for image_pattern in ad_cfg.images:
pattern_images = set()
- for image_file in glob.glob(image_pattern, root_dir=ad_dir, flags=glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
+ for image_file in glob.glob(image_pattern, root_dir = ad_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
_, image_file_ext = os.path.splitext(image_file)
ensure(image_file_ext.lower() in {".gif", ".jpg", ".jpeg", ".png"}, f"Unsupported image file type [{image_file}]")
if os.path.isabs(image_file):
pattern_images.add(image_file)
else:
- pattern_images.add(abspath(image_file, relative_to=ad_file))
+ pattern_images.add(abspath(image_file, relative_to = ad_file))
images.extend(sorted(pattern_images))
ensure(images or not ad_cfg.images, f"No images found for given file patterns {ad_cfg.images} at {ad_dir}")
ad_cfg.images = list(dict.fromkeys(images))
@@ -774,13 +774,13 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("Loaded %s", pluralize("ad", ads))
return ads
- def load_ad(self, ad_cfg_orig: dict[str, Any]) -> Ad:
+ def load_ad(self, ad_cfg_orig:dict[str, Any]) -> Ad:
return AdPartial.model_validate(ad_cfg_orig).to_ad(self.config.ad_defaults)
- async def check_and_wait_for_captcha(self, *, is_login_page: bool = True) -> None:
+ async def check_and_wait_for_captcha(self, *, is_login_page:bool = True) -> None:
try:
captcha_timeout = self._timeout("captcha_detection")
- await self.web_find(By.CSS_SELECTOR, "iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']", timeout=captcha_timeout)
+ await self.web_find(By.CSS_SELECTOR, "iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']", timeout = captcha_timeout)
if not is_login_page and self.config.captcha.auto_restart:
LOG.warning("Captcha recognized - auto-restart enabled, abort run...")
@@ -833,14 +833,14 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.web_input(By.ID, "login-password", "")
await self.web_input(By.ID, "login-password", self.config.login.password)
- await self.check_and_wait_for_captcha(is_login_page=True)
+ await self.check_and_wait_for_captcha(is_login_page = True)
await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']")
async def handle_after_login_logic(self) -> None:
try:
sms_timeout = self._timeout("sms_verification")
- await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout=sms_timeout)
+ await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout)
LOG.warning("############################################")
LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.")
LOG.warning("############################################")
@@ -852,10 +852,10 @@ class KleinanzeigenBot(WebScrapingMixin):
try:
LOG.info("Handling GDPR disclaimer...")
gdpr_timeout = self._timeout("gdpr_prompt")
- await self.web_find(By.ID, "gdpr-banner-accept", timeout=gdpr_timeout)
+ await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout)
await self.web_click(By.ID, "gdpr-banner-cmp-button")
await self.web_click(
- By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout=gdpr_timeout
+ By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout = gdpr_timeout
)
except TimeoutError:
# GDPR banner not shown within timeout.
@@ -873,7 +873,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# Try to find the standard element first
try:
- user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout=login_check_timeout)
+ user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout = login_check_timeout)
if username in user_info.lower():
LOG.debug(_("Login detected via .mr-medium element"))
return True
@@ -882,7 +882,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# If standard element not found or didn't contain username, try the alternative
try:
- user_info = await self.web_text(By.ID, "user-email", timeout=login_check_timeout)
+ user_info = await self.web_text(By.ID, "user-email", timeout = login_check_timeout)
if username in user_info.lower():
LOG.debug(_("Login detected via #user-email element"))
return True
@@ -892,7 +892,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.debug(_("No login detected - neither .mr-medium nor #user-email found with username"))
return False
- async def delete_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None:
+ async def delete_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
count = 0
published_ads = json.loads((await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"]
@@ -900,14 +900,14 @@ class KleinanzeigenBot(WebScrapingMixin):
for ad_file, ad_cfg, _ad_cfg_orig in ad_cfgs:
count += 1
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg.title, ad_file)
- await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=self.config.publishing.delete_old_ads_by_title)
+ await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
await self.web_sleep()
LOG.info("############################################")
LOG.info("DONE: Deleted %s", pluralize("ad", count))
LOG.info("############################################")
- async def delete_ad(self, ad_cfg: Ad, published_ads: list[dict[str, Any]], *, delete_old_ads_by_title: bool) -> bool:
+ async def delete_ad(self, ad_cfg:Ad, published_ads:list[dict[str, Any]], *, delete_old_ads_by_title:bool) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg.title)
await self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
@@ -922,21 +922,21 @@ class KleinanzeigenBot(WebScrapingMixin):
if ad_cfg.id == published_ad_id or ad_cfg.title == published_ad_title:
LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title)
await self.web_request(
- url=f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method="POST", headers={"x-csrf-token": str(csrf_token)}
+ url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method = "POST", headers = {"x-csrf-token": str(csrf_token)}
)
elif ad_cfg.id:
await self.web_request(
- url=f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg.id}",
- method="POST",
- headers={"x-csrf-token": str(csrf_token)},
- valid_response_codes=[200, 404],
+ url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg.id}",
+ method = "POST",
+ headers = {"x-csrf-token": str(csrf_token)},
+ valid_response_codes = [200, 404],
)
await self.web_sleep()
ad_cfg.id = None
return True
- async def extend_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None:
+ async def extend_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
"""Extends ads that are close to expiry."""
# Fetch currently published ads from API
published_ads = json.loads((await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"]
@@ -986,7 +986,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# Process extensions
success_count = 0
- for idx, (ad_file, ad_cfg, ad_cfg_orig, _published_ad) in enumerate(ads_to_extend, start=1):
+ for idx, (ad_file, ad_cfg, ad_cfg_orig, _published_ad) in enumerate(ads_to_extend, start = 1):
LOG.info(_("Processing %s/%s: '%s' from [%s]..."), idx, len(ads_to_extend), ad_cfg.title, ad_file)
if await self.extend_ad(ad_file, ad_cfg, ad_cfg_orig):
success_count += 1
@@ -996,7 +996,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info(_("DONE: Extended %s"), pluralize("ad", success_count))
LOG.info("############################################")
- async def extend_ad(self, ad_file: str, ad_cfg: Ad, ad_cfg_orig: dict[str, Any]) -> bool:
+ async def extend_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any]) -> bool:
"""Extends a single ad listing."""
LOG.info(_("Extending ad '%s' (ID: %s)..."), ad_cfg.title, ad_cfg.id)
@@ -1021,14 +1021,14 @@ class KleinanzeigenBot(WebScrapingMixin):
# Simply close the dialog with the X button (aria-label="Schließen")
try:
dialog_close_timeout = self._timeout("quick_dom")
- await self.web_click(By.CSS_SELECTOR, 'button[aria-label="Schließen"]', timeout=dialog_close_timeout)
+ await self.web_click(By.CSS_SELECTOR, 'button[aria-label="Schließen"]', timeout = dialog_close_timeout)
LOG.debug(" -> Closed confirmation dialog")
except TimeoutError:
LOG.warning(_(" -> No confirmation dialog found, extension may have completed directly"))
# Update metadata in YAML file
# Update updated_on to track when ad was extended
- ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec="seconds")
+ ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec = "seconds")
dicts.save_dict(ad_file, ad_cfg_orig)
LOG.info(_(" -> SUCCESS: ad extended with ID %s"), ad_cfg.id)
@@ -1045,7 +1045,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# Check for success messages
return await self.web_check(By.ID, "checking-done", Is.DISPLAYED) or await self.web_check(By.ID, "not-completed", Is.DISPLAYED)
- async def publish_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None:
+ async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
count = 0
failed_count = 0
max_retries = 3
@@ -1082,12 +1082,12 @@ class KleinanzeigenBot(WebScrapingMixin):
if success:
try:
publish_timeout = self._timeout("publishing_result")
- await self.web_await(self.__check_publishing_result, timeout=publish_timeout)
+ await self.web_await(self.__check_publishing_result, timeout = publish_timeout)
except TimeoutError:
LOG.warning(_(" -> Could not confirm publishing for '%s', but ad may be online"), ad_cfg.title)
if success and self.config.publishing.delete_old_ads == "AFTER_PUBLISH" and not self.keep_old_ads:
- await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=False)
+ await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = False)
LOG.info("############################################")
if failed_count > 0:
@@ -1097,7 +1097,7 @@ class KleinanzeigenBot(WebScrapingMixin):
LOG.info("############################################")
async def publish_ad(
- self, ad_file: str, ad_cfg: Ad, ad_cfg_orig: dict[str, Any], published_ads: list[dict[str, Any]], mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE
+ self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]], mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE
) -> None:
"""
@param ad_cfg: the effective ad config (i.e. with default values applied etc.)
@@ -1108,7 +1108,7 @@ class KleinanzeigenBot(WebScrapingMixin):
if mode == AdUpdateStrategy.REPLACE:
if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads:
- await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=self.config.publishing.delete_old_ads_by_title)
+ await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
# Apply auto price reduction only for REPLACE operations (actual reposts)
# This ensures price reductions only happen on republish, not on UPDATE
@@ -1197,12 +1197,12 @@ class KleinanzeigenBot(WebScrapingMixin):
elif not await self.web_check(By.ID, "radio-buy-now-no", Is.SELECTED):
await self.web_click(By.ID, "radio-buy-now-no")
except TimeoutError as ex:
- LOG.debug(ex, exc_info=True)
+ LOG.debug(ex, exc_info = True)
#############################
# set description
#############################
- description = self.__get_description(ad_cfg, with_affixes=True)
+ description = self.__get_description(ad_cfg, with_affixes = True)
await self.web_execute("document.querySelector('#pstad-descrptn').value = `" + description.replace("`", "'") + "`")
await self.__set_contact_fields(ad_cfg.contact)
@@ -1213,7 +1213,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
img_items = await self.web_find_all(By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)")
for element in img_items:
- btn = await self.web_find(By.CSS_SELECTOR, "button.pictureupload-thumbnails-remove", parent=element)
+ btn = await self.web_find(By.CSS_SELECTOR, "button.pictureupload-thumbnails-remove", parent = element)
await btn.click()
#############################
@@ -1224,7 +1224,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
# wait for captcha
#############################
- await self.check_and_wait_for_captcha(is_login_page=False)
+ await self.check_and_wait_for_captcha(is_login_page = False)
#############################
# submit
@@ -1250,7 +1250,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
try:
short_timeout = self._timeout("quick_dom")
- await self.web_find(By.ID, "myftr-shppngcrt-frm", timeout=short_timeout)
+ await self.web_find(By.ID, "myftr-shppngcrt-frm", timeout = short_timeout)
LOG.warning("############################################")
LOG.warning("# Payment form detected! Please proceed with payment.")
@@ -1262,7 +1262,7 @@ class KleinanzeigenBot(WebScrapingMixin):
pass
confirmation_timeout = self._timeout("publishing_confirmation")
- await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout=confirmation_timeout)
+ await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout = confirmation_timeout)
# extract the ad id from the URL's query parameter
current_url_query_params = urllib_parse.parse_qs(urllib_parse.urlparse(self.page.url).query)
@@ -1272,7 +1272,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# Update content hash after successful publication
# Calculate hash on original config to ensure consistent comparison on restart
ad_cfg_orig["content_hash"] = AdPartial.model_validate(ad_cfg_orig).update_content_hash().content_hash
- ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec="seconds")
+ ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec = "seconds")
if not ad_cfg.created_on and not ad_cfg.id:
ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"]
@@ -1299,7 +1299,7 @@ class KleinanzeigenBot(WebScrapingMixin):
dicts.save_dict(ad_file, ad_cfg_orig)
- async def __set_contact_fields(self, contact: Contact) -> None:
+ async def __set_contact_fields(self, contact:Contact) -> None:
#############################
# set contact zipcode
#############################
@@ -1384,7 +1384,7 @@ class KleinanzeigenBot(WebScrapingMixin):
)
)
- async def update_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None:
+ async def update_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
"""
Updates a list of ads.
The list gets filtered, so that only already published ads will be updated.
@@ -1415,25 +1415,25 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.publish_ad(ad_file, ad_cfg, ad_cfg_orig, published_ads, AdUpdateStrategy.MODIFY)
publish_timeout = self._timeout("publishing_result")
- await self.web_await(self.__check_publishing_result, timeout=publish_timeout)
+ await self.web_await(self.__check_publishing_result, timeout = publish_timeout)
LOG.info("############################################")
LOG.info("DONE: updated %s", pluralize("ad", count))
LOG.info("############################################")
- async def __set_condition(self, condition_value: str) -> None:
+ async def __set_condition(self, condition_value:str) -> None:
try:
# Open condition dialog
await self.web_click(By.XPATH, '//*[@id="j-post-listing-frontend-conditions"]//button[@aria-haspopup="true"]')
except TimeoutError:
- LOG.debug("Unable to open condition dialog and select condition [%s]", condition_value, exc_info=True)
+ LOG.debug("Unable to open condition dialog and select condition [%s]", condition_value, exc_info = True)
return
try:
# Click radio button
await self.web_click(By.ID, f"radio-button-{condition_value}")
except TimeoutError:
- LOG.debug("Unable to select condition [%s]", condition_value, exc_info=True)
+ LOG.debug("Unable to select condition [%s]", condition_value, exc_info = True)
try:
# Click accept button
@@ -1441,7 +1441,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except TimeoutError as ex:
raise TimeoutError(_("Unable to close condition dialog!")) from ex
- async def __set_category(self, category: str | None, ad_file: str) -> None:
+ async def __set_category(self, category:str | None, ad_file:str) -> None:
# click on something to trigger automatic category detection
await self.web_click(By.ID, "pstad-descrptn")
@@ -1464,7 +1464,7 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed")
- async def __set_special_attributes(self, ad_cfg: Ad) -> None:
+ async def __set_special_attributes(self, ad_cfg:Ad) -> None:
if not ad_cfg.special_attributes:
return
@@ -1499,7 +1499,7 @@ class KleinanzeigenBot(WebScrapingMixin):
raise TimeoutError(_("Failed to set attribute '%s'") % special_attribute_key) from ex
try:
- elem_id: str = str(special_attr_elem.attrs.id)
+ elem_id:str = str(special_attr_elem.attrs.id)
if special_attr_elem.local_name == "select":
LOG.debug(_("Attribute field '%s' seems to be a select..."), special_attribute_key)
await self.web_select(By.ID, elem_id, special_attribute_value_str)
@@ -1517,26 +1517,26 @@ class KleinanzeigenBot(WebScrapingMixin):
raise TimeoutError(_("Failed to set attribute '%s'") % special_attribute_key) from ex
LOG.debug("Successfully set attribute field [%s] to [%s]...", special_attribute_key, special_attribute_value_str)
- async def __set_shipping(self, ad_cfg: Ad, mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None:
+ async def __set_shipping(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None:
short_timeout = self._timeout("quick_dom")
if ad_cfg.shipping_type == "PICKUP":
try:
await self.web_click(By.ID, "radio-pickup")
except TimeoutError as ex:
- LOG.debug(ex, exc_info=True)
+ LOG.debug(ex, exc_info = True)
elif ad_cfg.shipping_options:
await self.web_click(By.XPATH, '//button//span[contains(., "Versandmethoden auswählen")]')
if mode == AdUpdateStrategy.MODIFY:
try:
# when "Andere Versandmethoden" is not available, go back and start over new
- await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout=short_timeout)
+ await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout = short_timeout)
except TimeoutError:
await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]')
# in some categories we need to go another dialog back
try:
- await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout=short_timeout)
+ await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout = short_timeout)
except TimeoutError:
await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]')
@@ -1562,7 +1562,7 @@ class KleinanzeigenBot(WebScrapingMixin):
try:
# only click on "Individueller Versand" when "IndividualShippingInput" is not available, otherwise its already checked
# (important for mode = UPDATE)
- await self.web_find(By.XPATH, '//input[contains(@placeholder, "Versandkosten (optional)")]', timeout=short_timeout)
+ await self.web_find(By.XPATH, '//input[contains(@placeholder, "Versandkosten (optional)")]', timeout = short_timeout)
except TimeoutError:
# Input not visible yet; click the individual shipping option.
await self.web_click(By.XPATH, '//*[contains(@id, "INDIVIDUAL") and contains(@data-testid, "Individueller Versand")]')
@@ -1573,10 +1573,10 @@ class KleinanzeigenBot(WebScrapingMixin):
)
await self.web_click(By.XPATH, '//dialog//button[contains(., "Fertig")]')
except TimeoutError as ex:
- LOG.debug(ex, exc_info=True)
+ LOG.debug(ex, exc_info = True)
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
- async def __set_shipping_options(self, ad_cfg: Ad, mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None:
+ async def __set_shipping_options(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None:
if not ad_cfg.shipping_options:
return
@@ -1596,7 +1596,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except KeyError as ex:
raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg.shipping_options}") from ex
- shipping_sizes, shipping_selector, shipping_packages = zip(*mapped_shipping_options, strict=False)
+ shipping_sizes, shipping_selector, shipping_packages = zip(*mapped_shipping_options, strict = False)
try:
(shipping_size,) = set(shipping_sizes)
@@ -1652,19 +1652,19 @@ class KleinanzeigenBot(WebScrapingMixin):
for shipping_package in to_be_clicked_shipping_packages:
await self.web_click(By.XPATH, f'//dialog//input[contains(@data-testid, "{shipping_package}")]')
except TimeoutError as ex:
- LOG.debug(ex, exc_info=True)
+ LOG.debug(ex, exc_info = True)
try:
# Click apply button
await self.web_click(By.XPATH, '//dialog//button[contains(., "Fertig")]')
except TimeoutError as ex:
raise TimeoutError(_("Unable to close shipping dialog!")) from ex
- async def __upload_images(self, ad_cfg: Ad) -> None:
+ async def __upload_images(self, ad_cfg:Ad) -> None:
if not ad_cfg.images:
return
LOG.info(" -> found %s", pluralize("image", ad_cfg.images))
- image_upload: Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]")
+ image_upload:Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]")
for image in ad_cfg.images:
LOG.info(" -> uploading image [%s]", image)
@@ -1680,7 +1680,7 @@ class KleinanzeigenBot(WebScrapingMixin):
thumbnails = await self.web_find_all(
By.CSS_SELECTOR,
"ul#j-pictureupload-thumbnails > li:not(.is-placeholder)",
- timeout=self._timeout("quick_dom"), # Fast timeout for polling
+ timeout = self._timeout("quick_dom"), # Fast timeout for polling
)
current_count = len(thumbnails)
if current_count < expected_count:
@@ -1691,12 +1691,12 @@ class KleinanzeigenBot(WebScrapingMixin):
return False
try:
- await self.web_await(check_thumbnails_uploaded, timeout=self._timeout("image_upload"), timeout_error_message=_("Image upload timeout exceeded"))
+ await self.web_await(check_thumbnails_uploaded, timeout = self._timeout("image_upload"), timeout_error_message = _("Image upload timeout exceeded"))
except TimeoutError as ex:
# Get current count for better error message
try:
thumbnails = await self.web_find_all(
- By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)", timeout=self._timeout("quick_dom")
+ By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)", timeout = self._timeout("quick_dom")
)
current_count = len(thumbnails)
except TimeoutError:
@@ -1738,7 +1738,7 @@ class KleinanzeigenBot(WebScrapingMixin):
elif self.ads_selector == "new": # download only unsaved ads
# check which ads already saved
saved_ad_ids = []
- ads = self.load_ads(ignore_inactive=False, exclude_ads_with_id=False) # do not skip because of existing IDs
+ ads = self.load_ads(ignore_inactive = False, exclude_ads_with_id = False) # do not skip because of existing IDs
for ad in ads:
saved_ad_id = ad[1].id
if saved_ad_id is None:
@@ -1775,7 +1775,7 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
LOG.error("The page with the id %d does not exist!", ad_id)
- def __get_description(self, ad_cfg: Ad, *, with_affixes: bool) -> str:
+ def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str:
"""Get the ad description optionally with prefix and suffix applied.
Precedence (highest to lowest):
@@ -1827,7 +1827,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return final_description
- def update_content_hashes(self, ads: list[tuple[str, Ad, dict[str, Any]]]) -> None:
+ def update_content_hashes(self, ads:list[tuple[str, Ad, dict[str, Any]]]) -> None:
count = 0
for ad_file, ad_cfg, ad_cfg_orig in ads:
@@ -1848,7 +1848,7 @@ class KleinanzeigenBot(WebScrapingMixin):
#############################
-def main(args: list[str]) -> None:
+def main(args:list[str]) -> None:
if "version" not in args:
print(
textwrap.dedent(rf"""
@@ -1861,7 +1861,7 @@ def main(args: list[str]) -> None:
https://github.com/Second-Hand-Friends/kleinanzeigen-bot
Version: {__version__}
""")[1:],
- flush=True,
+ flush = True,
) # [1:] removes the first empty blank line
loggers.configure_console_logging()
diff --git a/src/kleinanzeigen_bot/model/config_model.py b/src/kleinanzeigen_bot/model/config_model.py
index 9ad7438..e426a28 100644
--- a/src/kleinanzeigen_bot/model/config_model.py
+++ b/src/kleinanzeigen_bot/model/config_model.py
@@ -15,22 +15,22 @@ from kleinanzeigen_bot.utils import dicts
from kleinanzeigen_bot.utils.misc import get_attr
from kleinanzeigen_bot.utils.pydantics import ContextualModel
-_MAX_PERCENTAGE: Final[int] = 100
+_MAX_PERCENTAGE:Final[int] = 100
class AutoPriceReductionConfig(ContextualModel):
- enabled: bool = Field(default=False, description="automatically lower the price of reposted ads")
- strategy: Literal["FIXED", "PERCENTAGE"] | None = Field(
- default=None, description="PERCENTAGE reduces by a percentage of the previous price, FIXED reduces by a fixed amount"
+ enabled:bool = Field(default = False, description = "automatically lower the price of reposted ads")
+ strategy:Literal["FIXED", "PERCENTAGE"] | None = Field(
+ default = None, description = "PERCENTAGE reduces by a percentage of the previous price, FIXED reduces by a fixed amount"
)
- amount: float | None = Field(
- default=None, gt=0, description="magnitude of the reduction; interpreted as percent for PERCENTAGE or currency units for FIXED"
+ amount:float | None = Field(
+ default = None, gt = 0, description = "magnitude of the reduction; interpreted as percent for PERCENTAGE or currency units for FIXED"
)
- min_price: float | None = Field(default=None, ge=0, description="required when enabled is true; minimum price floor (use 0 for no lower bound)")
- delay_reposts: int = Field(default=0, ge=0, description="number of reposts to wait before applying the first automatic price reduction")
- delay_days: int = Field(default=0, ge=0, description="number of days to wait after publication before applying automatic price reductions")
+ min_price:float | None = Field(default = None, ge = 0, description = "required when enabled is true; minimum price floor (use 0 for no lower bound)")
+ delay_reposts:int = Field(default = 0, ge = 0, description = "number of reposts to wait before applying the first automatic price reduction")
+ delay_days:int = Field(default = 0, ge = 0, description = "number of days to wait after publication before applying automatic price reductions")
- @model_validator(mode="after")
+ @model_validator(mode = "after")
def _validate_config(self) -> "AutoPriceReductionConfig":
if self.enabled:
if self.strategy is None:
@@ -45,38 +45,38 @@ class AutoPriceReductionConfig(ContextualModel):
class ContactDefaults(ContextualModel):
- name: str | None = None
- street: str | None = None
- zipcode: int | str | None = None
- location: str | None = Field(
- default=None, description="city or locality of the listing (can include multiple districts)", examples=["Sample Town - District One"]
+ name:str | None = None
+ street:str | None = None
+ zipcode:int | str | None = None
+ location:str | None = Field(
+ default = None, description = "city or locality of the listing (can include multiple districts)", examples = ["Sample Town - District One"]
)
- phone: str | None = None
+ phone:str | None = None
@deprecated("Use description_prefix/description_suffix instead")
class DescriptionAffixes(ContextualModel):
- prefix: str | None = None
- suffix: str | None = None
+ prefix:str | None = None
+ suffix:str | None = None
class AdDefaults(ContextualModel):
- active: bool = True
- type: Literal["OFFER", "WANTED"] = "OFFER"
- description: DescriptionAffixes | None = None
- description_prefix: str | None = Field(default=None, description="prefix for the ad description")
- description_suffix: str | None = Field(default=None, description=" suffix for the ad description")
- price_type: Literal["FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"] = "NEGOTIABLE"
- auto_price_reduction: AutoPriceReductionConfig = Field(default_factory=AutoPriceReductionConfig, description="automatic price reduction configuration")
- shipping_type: Literal["PICKUP", "SHIPPING", "NOT_APPLICABLE"] = "SHIPPING"
- sell_directly: bool = Field(default=False, description="requires shipping_type SHIPPING to take effect")
- images: list[str] | None = Field(default=None)
- contact: ContactDefaults = Field(default_factory=ContactDefaults)
- republication_interval: int = 7
+ active:bool = True
+ type:Literal["OFFER", "WANTED"] = "OFFER"
+ description:DescriptionAffixes | None = None
+ description_prefix:str | None = Field(default = None, description = "prefix for the ad description")
+ description_suffix:str | None = Field(default = None, description = "suffix for the ad description")
+ price_type:Literal["FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"] = "NEGOTIABLE"
+ auto_price_reduction:AutoPriceReductionConfig = Field(default_factory = AutoPriceReductionConfig, description = "automatic price reduction configuration")
+ shipping_type:Literal["PICKUP", "SHIPPING", "NOT_APPLICABLE"] = "SHIPPING"
+ sell_directly:bool = Field(default = False, description = "requires shipping_type SHIPPING to take effect")
+ images:list[str] | None = Field(default = None)
+ contact:ContactDefaults = Field(default_factory = ContactDefaults)
+ republication_interval:int = 7
- @model_validator(mode="before")
+ @model_validator(mode = "before")
@classmethod
- def migrate_legacy_description(cls, values: dict[str, Any]) -> dict[str, Any]:
+ def migrate_legacy_description(cls, values:dict[str, Any]) -> dict[str, Any]:
# Ensure flat prefix/suffix take precedence over deprecated nested "description"
description_prefix = values.get("description_prefix")
description_suffix = values.get("description_suffix")
@@ -91,71 +91,74 @@ class AdDefaults(ContextualModel):
class DownloadConfig(ContextualModel):
- include_all_matching_shipping_options: bool = Field(default=False, description="if true, all shipping options matching the package size will be included")
- excluded_shipping_options: list[str] = Field(default_factory=list, description="list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']")
- folder_name_max_length: int = Field(default=100, ge=10, le=255, description="maximum length for folder names when downloading ads (default: 100)")
- rename_existing_folders: bool = Field(default=False, description="if true, rename existing folders without titles to include titles (default: false)")
+ include_all_matching_shipping_options:bool = Field(
+ default = False,
+ description = "if true, all shipping options matching the package size will be included",
+ )
+ excluded_shipping_options:list[str] = Field(default_factory = list, description = "list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']")
+ folder_name_max_length:int = Field(default = 100, ge = 10, le = 255, description = "maximum length for folder names when downloading ads (default: 100)")
+ rename_existing_folders:bool = Field(default = False, description = "if true, rename existing folders without titles to include titles (default: false)")
class BrowserConfig(ContextualModel):
- arguments: list[str] = Field(
- default_factory=list,
+ arguments:list[str] = Field(
+ default_factory = list,
description=(
"See https://peter.sh/experiments/chromium-command-line-switches/. "
"Browser profile path is auto-configured based on installation mode (portable/XDG)."
),
)
- binary_location: str | None = Field(default=None, description="path to custom browser executable, if not specified will be looked up on PATH")
- extensions: list[str] = Field(default_factory=list, description="a list of .crx extension files to be loaded")
- use_private_window: bool = True
- user_data_dir: str | None = Field(
- default=None,
+ binary_location:str | None = Field(default = None, description = "path to custom browser executable, if not specified will be looked up on PATH")
+ extensions:list[str] = Field(default_factory = list, description = "a list of .crx extension files to be loaded")
+ use_private_window:bool = True
+ user_data_dir:str | None = Field(
+ default = None,
description=(
"See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md. "
"If not specified, defaults to XDG cache directory in XDG mode or .temp/browser-profile in portable mode."
),
)
- profile_name: str | None = None
+ profile_name:str | None = None
class LoginConfig(ContextualModel):
- username: str = Field(..., min_length=1)
- password: str = Field(..., min_length=1)
+ username:str = Field(..., min_length = 1)
+ password:str = Field(..., min_length = 1)
class PublishingConfig(ContextualModel):
- delete_old_ads: Literal["BEFORE_PUBLISH", "AFTER_PUBLISH", "NEVER"] | None = "AFTER_PUBLISH"
- delete_old_ads_by_title: bool = Field(default=True, description="only works if delete_old_ads is set to BEFORE_PUBLISH")
+ delete_old_ads:Literal["BEFORE_PUBLISH", "AFTER_PUBLISH", "NEVER"] | None = "AFTER_PUBLISH"
+ delete_old_ads_by_title:bool = Field(default = True, description = "only works if delete_old_ads is set to BEFORE_PUBLISH")
class CaptchaConfig(ContextualModel):
- auto_restart: bool = False
- restart_delay: str = "6h"
+ auto_restart:bool = False
+ restart_delay:str = "6h"
class TimeoutConfig(ContextualModel):
- multiplier: float = Field(default=1.0, ge=0.1, description="Global multiplier applied to all timeout values.")
- default: float = Field(default=5.0, ge=0.0, description="Baseline timeout for DOM interactions.")
- page_load: float = Field(default=15.0, ge=1.0, description="Page load timeout for web_open.")
- captcha_detection: float = Field(default=2.0, ge=0.1, description="Timeout for captcha iframe detection.")
- sms_verification: float = Field(default=4.0, ge=0.1, description="Timeout for SMS verification prompts.")
- gdpr_prompt: float = Field(default=10.0, ge=1.0, description="Timeout for GDPR/consent dialogs.")
- login_detection: float = Field(default=10.0, ge=1.0, description="Timeout for detecting existing login session via DOM elements.")
- publishing_result: float = Field(default=300.0, ge=10.0, description="Timeout for publishing result checks.")
- publishing_confirmation: float = Field(default=20.0, ge=1.0, description="Timeout for publish confirmation redirect.")
- image_upload: float = Field(default=30.0, ge=5.0, description="Timeout for image upload and server-side processing.")
- pagination_initial: float = Field(default=10.0, ge=1.0, description="Timeout for initial pagination lookup.")
- pagination_follow_up: float = Field(default=5.0, ge=1.0, description="Timeout for subsequent pagination navigation.")
- quick_dom: float = Field(default=2.0, ge=0.1, description="Generic short timeout for transient UI.")
- update_check: float = Field(default=10.0, ge=1.0, description="Timeout for GitHub update checks.")
- chrome_remote_probe: float = Field(default=2.0, ge=0.1, description="Timeout for local remote-debugging probes.")
- chrome_remote_debugging: float = Field(default=5.0, ge=1.0, description="Timeout for remote debugging API calls.")
- chrome_binary_detection: float = Field(default=10.0, ge=1.0, description="Timeout for chrome --version subprocesses.")
- retry_enabled: bool = Field(default=True, description="Enable built-in retry/backoff for DOM operations.")
- retry_max_attempts: int = Field(default=2, ge=1, description="Max retry attempts when retry is enabled.")
- retry_backoff_factor: float = Field(default=1.5, ge=1.0, description="Exponential factor applied per retry attempt.")
+ multiplier:float = Field(default = 1.0, ge = 0.1, description = "Global multiplier applied to all timeout values.")
+ default:float = Field(default = 5.0, ge = 0.0, description = "Baseline timeout for DOM interactions.")
+ page_load:float = Field(default = 15.0, ge = 1.0, description = "Page load timeout for web_open.")
+ captcha_detection:float = Field(default = 2.0, ge = 0.1, description = "Timeout for captcha iframe detection.")
+ sms_verification:float = Field(default = 4.0, ge = 0.1, description = "Timeout for SMS verification prompts.")
+ gdpr_prompt:float = Field(default = 10.0, ge = 1.0, description = "Timeout for GDPR/consent dialogs.")
+ login_detection:float = Field(default = 10.0, ge = 1.0, description = "Timeout for detecting existing login session via DOM elements.")
+ publishing_result:float = Field(default = 300.0, ge = 10.0, description = "Timeout for publishing result checks.")
+ publishing_confirmation:float = Field(default = 20.0, ge = 1.0, description = "Timeout for publish confirmation redirect.")
+ image_upload:float = Field(default = 30.0, ge = 5.0, description = "Timeout for image upload and server-side processing.")
+ pagination_initial:float = Field(default = 10.0, ge = 1.0, description = "Timeout for initial pagination lookup.")
+ pagination_follow_up:float = Field(default = 5.0, ge = 1.0, description = "Timeout for subsequent pagination navigation.")
+ quick_dom:float = Field(default = 2.0, ge = 0.1, description = "Generic short timeout for transient UI.")
+ update_check:float = Field(default = 10.0, ge = 1.0, description = "Timeout for GitHub update checks.")
+ chrome_remote_probe:float = Field(default = 2.0, ge = 0.1, description = "Timeout for local remote-debugging probes.")
+ chrome_remote_debugging:float = Field(default = 5.0, ge = 1.0, description = "Timeout for remote debugging API calls.")
+ chrome_binary_detection:float = Field(default = 10.0, ge = 1.0, description = "Timeout for chrome --version subprocesses.")
+ retry_enabled:bool = Field(default = True, description = "Enable built-in retry/backoff for DOM operations.")
+ retry_max_attempts:int = Field(default = 2, ge = 1, description = "Max retry attempts when retry is enabled.")
+ retry_backoff_factor:float = Field(default = 1.5, ge = 1.0, description = "Exponential factor applied per retry attempt.")
- def resolve(self, key: str = "default", override: float | None = None) -> float:
+ def resolve(self, key:str = "default", override:float | None = None) -> float:
"""
Return the base timeout (seconds) for the given key without applying modifiers.
"""
@@ -171,7 +174,7 @@ class TimeoutConfig(ContextualModel):
return float(self.default)
- def effective(self, key: str = "default", override: float | None = None, *, attempt: int = 0) -> float:
+ def effective(self, key:str = "default", override:float | None = None, *, attempt:int = 0) -> float:
"""
Return the effective timeout (seconds) with multiplier/backoff applied.
"""
@@ -180,7 +183,7 @@ class TimeoutConfig(ContextualModel):
return base * self.multiplier * backoff
-def _validate_glob_pattern(v: str) -> str:
+def _validate_glob_pattern(v:str) -> str:
if not v.strip():
raise ValueError("must be a non-empty, non-blank glob pattern")
return v
@@ -190,20 +193,20 @@ GlobPattern = Annotated[str, AfterValidator(_validate_glob_pattern)]
class Config(ContextualModel):
- ad_files: list[GlobPattern] = Field(
- default_factory=lambda: ["./**/ad_*.{json,yml,yaml}"],
- min_items=1,
- description="""
+ ad_files:list[GlobPattern] = Field(
+ default_factory = lambda: ["./**/ad_*.{json,yml,yaml}"],
+ min_items = 1,
+ description = """
glob (wildcard) patterns to select ad configuration files
if relative paths are specified, then they are relative to this configuration file
""",
) # type: ignore[call-overload]
- ad_defaults: AdDefaults = Field(default_factory=AdDefaults, description="Default values for ads, can be overwritten in each ad configuration file")
+ ad_defaults:AdDefaults = Field(default_factory = AdDefaults, description = "Default values for ads, can be overwritten in each ad configuration file")
- categories: dict[str, str] = Field(
- default_factory=dict,
- description="""
+ categories:dict[str, str] = Field(
+ default_factory = dict,
+ description = """
additional name to category ID mappings, see default list at
https://github.com/Second-Hand-Friends/kleinanzeigen-bot/blob/main/src/kleinanzeigen_bot/resources/categories.yaml
@@ -214,13 +217,13 @@ Example:
""",
)
- download: DownloadConfig = Field(default_factory=DownloadConfig)
- publishing: PublishingConfig = Field(default_factory=PublishingConfig)
- browser: BrowserConfig = Field(default_factory=BrowserConfig, description="Browser configuration")
- login: LoginConfig = Field(default_factory=LoginConfig.model_construct, description="Login credentials")
- captcha: CaptchaConfig = Field(default_factory=CaptchaConfig)
- update_check: UpdateCheckConfig = Field(default_factory=UpdateCheckConfig, description="Update check configuration")
- timeouts: TimeoutConfig = Field(default_factory=TimeoutConfig, description="Centralized timeout configuration.")
+ download:DownloadConfig = Field(default_factory = DownloadConfig)
+ publishing:PublishingConfig = Field(default_factory = PublishingConfig)
+ browser:BrowserConfig = Field(default_factory = BrowserConfig, description = "Browser configuration")
+ login:LoginConfig = Field(default_factory = LoginConfig.model_construct, description = "Login credentials")
+ captcha:CaptchaConfig = Field(default_factory = CaptchaConfig)
+ update_check:UpdateCheckConfig = Field(default_factory = UpdateCheckConfig, description = "Update check configuration")
+ timeouts:TimeoutConfig = Field(default_factory = TimeoutConfig, description = "Centralized timeout configuration.")
- def with_values(self, values: dict[str, Any]) -> Config:
- return Config.model_validate(dicts.apply_defaults(copy.deepcopy(values), defaults=self.model_dump()))
+ def with_values(self, values:dict[str, Any]) -> Config:
+ return Config.model_validate(dicts.apply_defaults(copy.deepcopy(values), defaults = self.model_dump()))
diff --git a/src/kleinanzeigen_bot/resources/translations.de.yaml b/src/kleinanzeigen_bot/resources/translations.de.yaml
index 0f0b443..cd2f48a 100644
--- a/src/kleinanzeigen_bot/resources/translations.de.yaml
+++ b/src/kleinanzeigen_bot/resources/translations.de.yaml
@@ -457,6 +457,9 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py:
" -> Browser profile name: %s": " -> Browser-Profilname: %s"
" -> Browser user data dir: %s": " -> Browser-Benutzerdatenverzeichnis: %s"
" -> Custom Browser argument: %s": " -> Benutzerdefiniertes Browser-Argument: %s"
+ "Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir.": "Ignoriere leeres --user-data-dir= Argument; verwende konfiguriertes user_data_dir."
+ "Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value.": "Konfiguriertes browser.user_data_dir (%s) stimmt nicht mit --user-data-dir Argument (%s) überein; verwende Argument-Wert."
+ "Remote debugging detected, but browser configuration looks invalid: %s": "Remote-Debugging erkannt, aber Browser-Konfiguration scheint ungültig: %s"
" -> Setting chrome prefs [%s]...": " -> Setze Chrome-Einstellungen [%s]..."
" -> Adding Browser extension: [%s]": " -> Füge Browser-Erweiterung hinzu: [%s]"
"Failed to connect to browser. This error often occurs when:": "Fehler beim Verbinden mit dem Browser. Dieser Fehler tritt häufig auf, wenn:"
@@ -546,8 +549,8 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py:
" -> Unexpected error during browser version validation, skipping: %s": " -> Unerwarteter Fehler bei Browser-Versionsvalidierung, wird übersprungen: %s"
_diagnose_chrome_version_issues:
- "(info) %s version from binary: %s %s (major: %d)": "(Info) %s-Version von Binärdatei: %s %s (Hauptversion: %d)"
- "(info) %s version from remote debugging: %s %s (major: %d)": "(Info) %s-Version von Remote-Debugging: %s %s (Hauptversion: %d)"
+ "(info) %s version from binary: %s (major: %d)": "(Info) %s-Version von Binärdatei: %s (Hauptversion: %d)"
+ "(info) %s version from remote debugging: %s (major: %d)": "(Info) %s-Version von Remote-Debugging: %s (Hauptversion: %d)"
"(info) %s 136+ detected - security validation required": "(Info) %s 136+ erkannt - Sicherheitsvalidierung erforderlich"
"(info) %s pre-136 detected - no special security requirements": "(Info) %s vor 136 erkannt - keine besonderen Sicherheitsanforderungen"
"(info) Remote %s 136+ detected - validating configuration": "(Info) Remote %s 136+ erkannt - validiere Konfiguration"
diff --git a/src/kleinanzeigen_bot/update_checker.py b/src/kleinanzeigen_bot/update_checker.py
index ad5cebd..7a3fbec 100644
--- a/src/kleinanzeigen_bot/update_checker.py
+++ b/src/kleinanzeigen_bot/update_checker.py
@@ -31,7 +31,7 @@ colorama.init()
class UpdateChecker:
"""Checks for updates to the bot."""
- def __init__(self, config: "Config", installation_mode: str | xdg_paths.InstallationMode = "portable") -> None:
+ def __init__(self, config:"Config", installation_mode:str | xdg_paths.InstallationMode = "portable") -> None:
"""Initialize the update checker.
Args:
@@ -55,7 +55,7 @@ class UpdateChecker:
"""Return the effective timeout for HTTP calls."""
return self.config.timeouts.effective("update_check")
- def _get_commit_hash(self, version: str) -> str | None:
+ def _get_commit_hash(self, version:str) -> str | None:
"""Extract the commit hash from a version string.
Args:
@@ -68,7 +68,7 @@ class UpdateChecker:
return version.split("+")[1]
return None
- def _resolve_commitish(self, commitish: str) -> tuple[str | None, datetime | None]:
+ def _resolve_commitish(self, commitish:str) -> tuple[str | None, datetime | None]:
"""Resolve a commit-ish to a full commit hash and date.
Args:
@@ -80,7 +80,7 @@ class UpdateChecker:
try:
response = requests.get(
f"https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/commits/{commitish}",
- timeout=self._request_timeout(),
+ timeout = self._request_timeout(),
)
response.raise_for_status()
data = response.json()
@@ -96,7 +96,7 @@ class UpdateChecker:
logger.warning(_("Could not resolve commit '%s': %s"), commitish, e)
return None, None
- def _get_short_commit_hash(self, commit: str) -> str:
+ def _get_short_commit_hash(self, commit:str) -> str:
"""Get the short version of a commit hash.
Args:
@@ -107,7 +107,7 @@ class UpdateChecker:
"""
return commit[:7]
- def _commits_match(self, local_commit: str, release_commit: str) -> bool:
+ def _commits_match(self, local_commit:str, release_commit:str) -> bool:
"""Determine whether two commits refer to the same hash.
This accounts for short vs. full hashes (e.g. 7 chars vs. 40 chars).
@@ -120,7 +120,7 @@ class UpdateChecker:
return True
return len(release_commit) < len(local_commit) and local_commit.startswith(release_commit)
- def check_for_updates(self, *, skip_interval_check: bool = False) -> None:
+ def check_for_updates(self, *, skip_interval_check:bool = False) -> None:
"""Check for updates to the bot.
Args:
@@ -147,7 +147,7 @@ class UpdateChecker:
try:
if self.config.update_check.channel == "latest":
# Use /releases/latest endpoint for stable releases
- response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases/latest", timeout=self._request_timeout())
+ response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases/latest", timeout = self._request_timeout())
response.raise_for_status()
release = response.json()
# Defensive: ensure it's not a prerelease
@@ -156,7 +156,7 @@ class UpdateChecker:
return
elif self.config.update_check.channel == "preview":
# Use /releases endpoint and select the most recent prerelease
- response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases", timeout=self._request_timeout())
+ response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases", timeout = self._request_timeout())
response.raise_for_status()
releases = response.json()
# Find the most recent prerelease
diff --git a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py
index 7a591fa..43189c2 100644
--- a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py
+++ b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py
@@ -4,6 +4,7 @@
import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404
from collections.abc import Awaitable, Callable, Coroutine, Iterable
from gettext import gettext as _
+from pathlib import Path
from typing import Any, Final, Optional, cast
try:
@@ -22,7 +23,7 @@ from nodriver.core.tab import Tab as Page
from kleinanzeigen_bot.model.config_model import Config as BotConfig
from kleinanzeigen_bot.model.config_model import TimeoutConfig
-from . import files, loggers, net
+from . import files, loggers, net, xdg_paths
from .chrome_version_detector import (
ChromeVersionInfo,
detect_chrome_version_from_binary,
@@ -40,6 +41,28 @@ if TYPE_CHECKING:
_KEY_VALUE_PAIR_SIZE = 2
+def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]:
+ """Resolve the argument and config user_data_dir paths for comparison."""
+ try:
+ return (
+ Path(arg_value).expanduser().resolve(),
+ Path(config_value).expanduser().resolve(),
+ )
+ except OSError as exc:
+ LOG.debug("Failed to resolve user_data_dir paths for comparison: %s", exc)
+ return None, None
+
+
+def _has_non_empty_user_data_dir_arg(args:Iterable[str]) -> bool:
+ for arg in args:
+ if not arg.startswith("--user-data-dir="):
+ continue
+ raw = arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
+ if raw:
+ return True
+ return False
+
+
def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]:
"""Type guard to check if an object is a RemoteObject."""
return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj))
@@ -58,7 +81,7 @@ __all__ = [
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
# see https://api.jquery.com/category/selectors/
-METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'})
+METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in "!\"#$%&'()*+,./:;<=>?@[\\]^`{|}~"})
def _is_admin() -> bool:
@@ -90,7 +113,6 @@ class Is(enum.Enum):
class BrowserConfig:
-
def __init__(self) -> None:
self.arguments:Iterable[str] = []
self.binary_location:str | None = None
@@ -102,37 +124,27 @@ class BrowserConfig:
def _write_initial_prefs(prefs_file:str) -> None:
with open(prefs_file, "w", encoding = "UTF-8") as fd:
- json.dump({
- "credentials_enable_service": False,
- "enable_do_not_track": True,
- "google": {
- "services": {
- "consented_to_sync": False
- }
- },
- "profile": {
- "default_content_setting_values": {
- "popups": 0,
- "notifications": 2 # 1 = allow, 2 = block browser notifications
+ json.dump(
+ {
+ "credentials_enable_service": False,
+ "enable_do_not_track": True,
+ "google": {"services": {"consented_to_sync": False}},
+ "profile": {
+ "default_content_setting_values": {
+ "popups": 0,
+ "notifications": 2, # 1 = allow, 2 = block browser notifications
+ },
+ "password_manager_enabled": False,
},
- "password_manager_enabled": False
+ "signin": {"allowed": False},
+ "translate_site_blacklist": ["www.kleinanzeigen.de"],
+ "devtools": {"preferences": {"currentDockState": '"bottom"'}},
},
- "signin": {
- "allowed": False
- },
- "translate_site_blacklist": [
- "www.kleinanzeigen.de"
- ],
- "devtools": {
- "preferences": {
- "currentDockState": '"bottom"'
- }
- }
- }, fd)
+ fd,
+ )
class WebScrapingMixin:
-
def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue]
@@ -140,6 +152,11 @@ class WebScrapingMixin:
self._default_timeout_config:TimeoutConfig | None = None
self.config:BotConfig = cast(BotConfig, None)
+ @property
+ def _installation_mode(self) -> str:
+ """Get installation mode with fallback to portable."""
+ return getattr(self, "installation_mode_or_portable", "portable")
+
def _get_timeout_config(self) -> TimeoutConfig:
config = getattr(self, "config", None)
timeouts:TimeoutConfig | None = None
@@ -172,12 +189,7 @@ class WebScrapingMixin:
return 1 + cfg.retry_max_attempts
async def _run_with_timeout_retries(
- self,
- operation:Callable[[float], Awaitable[T]],
- *,
- description:str,
- key:str = "default",
- override:float | None = None
+ self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None
) -> T:
"""
Execute an async callable with retry/backoff handling for TimeoutError.
@@ -191,13 +203,7 @@ class WebScrapingMixin:
except TimeoutError:
if attempt >= attempts - 1:
raise
- LOG.debug(
- "Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)",
- description,
- attempt + 1,
- attempts,
- effective_timeout
- )
+ LOG.debug("Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout)
raise TimeoutError(f"{description} failed without executing operation")
@@ -210,8 +216,25 @@ class WebScrapingMixin:
self.browser_config.binary_location = self.get_compatible_browser()
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
+ has_remote_debugging = any(arg.startswith("--remote-debugging-port=") for arg in self.browser_config.arguments)
+ is_test_environment = bool(os.environ.get("PYTEST_CURRENT_TEST"))
+
+ if (
+ not (self.browser_config.user_data_dir and self.browser_config.user_data_dir.strip())
+ and not _has_non_empty_user_data_dir_arg(self.browser_config.arguments)
+ and not has_remote_debugging
+ and not is_test_environment
+ ):
+ self.browser_config.user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode))
+
# Chrome version detection and validation
- await self._validate_chrome_version_configuration()
+ if has_remote_debugging:
+ try:
+ await self._validate_chrome_version_configuration()
+ except AssertionError as exc:
+ LOG.warning(_("Remote debugging detected, but browser configuration looks invalid: %s"), exc)
+ else:
+ await self._validate_chrome_version_configuration()
########################################################
# check if an existing browser instance shall be used...
@@ -229,10 +252,12 @@ class WebScrapingMixin:
# Enhanced port checking with retry logic
port_available = await self._check_port_with_retry(remote_host, remote_port)
- ensure(port_available,
+ ensure(
+ port_available,
f"Browser process not reachable at {remote_host}:{remote_port}. "
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. "
- f"Make sure the browser is running and the port is not blocked by firewall.")
+ f"Make sure the browser is running and the port is not blocked by firewall.",
+ )
try:
cfg = NodriverConfig(
@@ -255,8 +280,7 @@ class WebScrapingMixin:
LOG.error("Troubleshooting steps:")
LOG.error("1. Close all browser instances and try again")
LOG.error("2. Remove the user_data_dir configuration temporarily")
- LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d",
- self.browser_config.binary_location, remote_port)
+ LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", self.browser_config.binary_location, remote_port)
LOG.error("4. Check if any antivirus or security software is blocking the connection")
raise
@@ -274,13 +298,11 @@ class WebScrapingMixin:
"--disable-sync",
"--no-experiments",
"--disable-search-engine-choice-screen",
-
"--disable-features=MediaRouter",
"--use-mock-keychain",
-
"--test-type", # https://stackoverflow.com/a/36746675/5116073
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
- '--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"'
+ '--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"',
]
is_edge = "edge" in self.browser_config.binary_location.lower()
@@ -295,10 +317,36 @@ class WebScrapingMixin:
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
+ user_data_dir_from_args:str | None = None
for browser_arg in self.browser_config.arguments:
LOG.info(" -> Custom Browser argument: %s", browser_arg)
+ if browser_arg.startswith("--user-data-dir="):
+ raw = browser_arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
+ if not raw:
+ LOG.warning(_("Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir."))
+ continue
+ user_data_dir_from_args = raw
+ continue
browser_args.append(browser_arg)
+ effective_user_data_dir = user_data_dir_from_args or self.browser_config.user_data_dir
+ if user_data_dir_from_args and self.browser_config.user_data_dir:
+ arg_path, cfg_path = await asyncio.get_running_loop().run_in_executor(
+ None,
+ _resolve_user_data_dir_paths,
+ user_data_dir_from_args,
+ self.browser_config.user_data_dir,
+ )
+ if arg_path is None or cfg_path is None or arg_path != cfg_path:
+ LOG.warning(
+ _("Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value."),
+ self.browser_config.user_data_dir,
+ user_data_dir_from_args,
+ )
+ if not effective_user_data_dir and not is_test_environment:
+ effective_user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode))
+ self.browser_config.user_data_dir = effective_user_data_dir
+
if not loggers.is_debug(LOG):
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
@@ -309,7 +357,7 @@ class WebScrapingMixin:
headless = False,
browser_executable_path = self.browser_config.binary_location,
browser_args = browser_args,
- user_data_dir = self.browser_config.user_data_dir
+ user_data_dir = self.browser_config.user_data_dir,
)
# already logged by nodriver:
@@ -371,8 +419,7 @@ class WebScrapingMixin:
return True
if attempt < max_retries - 1:
- LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)",
- host, port, retry_delay, attempt + 1, max_retries)
+ LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", host, port, retry_delay, attempt + 1, max_retries)
await asyncio.sleep(retry_delay)
return False
@@ -522,12 +569,7 @@ class WebScrapingMixin:
browser_paths:list[str | None] = []
match platform.system():
case "Linux":
- browser_paths = [
- shutil.which("chromium"),
- shutil.which("chromium-browser"),
- shutil.which("google-chrome"),
- shutil.which("microsoft-edge")
- ]
+ browser_paths = [shutil.which("chromium"), shutil.which("chromium-browser"), shutil.which("google-chrome"), shutil.which("microsoft-edge")]
case "Darwin":
browser_paths = [
@@ -540,18 +582,15 @@ class WebScrapingMixin:
browser_paths = [
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe",
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe",
-
os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe",
os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe",
os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe",
-
os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe",
os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe",
os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe",
-
shutil.which("msedge.exe"),
shutil.which("chromium.exe"),
- shutil.which("chrome.exe")
+ shutil.which("chrome.exe"),
]
case _ as os_name:
@@ -563,8 +602,14 @@ class WebScrapingMixin:
raise AssertionError(_("Installed browser could not be detected"))
- async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
- timeout:int | float | None = None, timeout_error_message:str = "", apply_multiplier:bool = True) -> T:
+ async def web_await(
+ self,
+ condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]],
+ *,
+ timeout:int | float | None = None,
+ timeout_error_message:str = "",
+ apply_multiplier:bool = True,
+ ) -> T:
"""
Blocks/waits until the given condition is met.
@@ -604,7 +649,9 @@ class WebScrapingMixin:
return elem.attrs.get("disabled") is not None
async def is_displayed(elem:Element) -> bool:
- return cast(bool, await elem.apply("""
+ return cast(
+ bool,
+ await elem.apply("""
function (element) {
var style = window.getComputedStyle(element);
return style.display !== 'none'
@@ -613,7 +660,8 @@ class WebScrapingMixin:
&& element.offsetWidth > 0
&& element.offsetHeight > 0
}
- """))
+ """),
+ )
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
@@ -627,7 +675,9 @@ class WebScrapingMixin:
case Is.READONLY:
return elem.attrs.get("readonly") is not None
case Is.SELECTED:
- return cast(bool, await elem.apply("""
+ return cast(
+ bool,
+ await elem.apply("""
function (element) {
if (element.tagName.toLowerCase() === 'input') {
if (element.type === 'checkbox' || element.type === 'radio') {
@@ -636,7 +686,8 @@ class WebScrapingMixin:
}
return false
}
- """))
+ """),
+ )
raise AssertionError(_("Unsupported attribute: %s") % attr)
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element:
@@ -743,11 +794,8 @@ class WebScrapingMixin:
async def attempt(effective_timeout:float) -> Element:
return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent)
- return await self._run_with_timeout_retries(
- attempt,
- description = f"web_find({selector_type.name}, {selector_value})",
- key = "default",
- override = timeout
+ return await self._run_with_timeout_retries( # noqa: E501
+ attempt, description = f"web_find({selector_type.name}, {selector_value})", key = "default", override = timeout
)
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]:
@@ -762,10 +810,7 @@ class WebScrapingMixin:
return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent)
return await self._run_with_timeout_retries(
- attempt,
- description = f"web_find_all({selector_type.name}, {selector_value})",
- key = "default",
- override = timeout
+ attempt, description = f"web_find_all({selector_type.name}, {selector_value})", key = "default", override = timeout
)
async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element:
@@ -778,40 +823,46 @@ class WebScrapingMixin:
lambda: self.page.query_selector(f"#{escaped_id}", parent),
timeout = timeout,
timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.CLASS_NAME:
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
return await self.web_await(
lambda: self.page.query_selector(f".{escaped_classname}", parent),
timeout = timeout,
timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.TAG_NAME:
return await self.web_await(
lambda: self.page.query_selector(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.CSS_SELECTOR:
return await self.web_await(
lambda: self.page.query_selector(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.TEXT:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_element_by_text(selector_value, best_match = True),
timeout = timeout,
timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.XPATH:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_element_by_text(selector_value, best_match = True),
timeout = timeout,
timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
@@ -825,33 +876,38 @@ class WebScrapingMixin:
lambda: self.page.query_selector_all(f".{escaped_classname}", parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.CSS_SELECTOR:
return await self.web_await(
lambda: self.page.query_selector_all(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.TAG_NAME:
return await self.web_await(
lambda: self.page.query_selector_all(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.TEXT:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_elements_by_text(selector_value),
timeout = timeout,
timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
case By.XPATH:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_elements_by_text(selector_value),
timeout = timeout,
timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}",
- apply_multiplier = False)
+ apply_multiplier = False,
+ )
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
@@ -885,11 +941,12 @@ class WebScrapingMixin:
lambda: self.web_execute("document.readyState == 'complete'"),
timeout = page_timeout,
timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.",
- apply_multiplier = False
+ apply_multiplier = False,
)
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str:
- return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
+ return str(
+ await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
function (elem) {
let sel = window.getSelection()
sel.removeAllRanges()
@@ -900,16 +957,19 @@ class WebScrapingMixin:
sel.removeAllRanges()
return visibleText
}
- """))
+ """)
+ )
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
- LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
- " ... pausing for %d ms ...", duration)
+ LOG.log(
+ loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
+ " ... pausing for %d ms ...",
+ duration,
+ )
await self.page.sleep(duration / 1_000)
- async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
- headers:dict[str, str] | None = None) -> Any:
+ async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, headers:dict[str, str] | None = None) -> Any:
method = method.upper()
LOG.debug(" -> HTTP %s [%s]...", method, url)
response = await self.web_execute(f"""
@@ -933,9 +993,10 @@ class WebScrapingMixin:
valid_response_codes = [valid_response_codes]
ensure(
response["statusCode"] in valid_response_codes,
- f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
+ f'Invalid response "{response["statusCode"]} {response["statusMessage"]}" received for HTTP {method} to {url}',
)
return response
+
# pylint: enable=dangerous-default-value
async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None:
@@ -968,8 +1029,9 @@ class WebScrapingMixin:
:raises UnexpectedTagNameException: if element is not a