From eda1b4d0ecb3de1762d5d2f107b199f071a76534 Mon Sep 17 00:00:00 2001 From: Jens <1742418+1cu@users.noreply.github.com> Date: Fri, 23 Jan 2026 22:45:22 +0100 Subject: [PATCH] feat: add browser profile XDG support and documentation (#777) --- .github/workflows/build.yml | 9 +- README.md | 19 + pyproject.toml | 5 +- schemas/config.schema.json | 6 +- src/kleinanzeigen_bot/__init__.py | 186 +++---- src/kleinanzeigen_bot/model/config_model.py | 183 +++---- .../resources/translations.de.yaml | 7 +- src/kleinanzeigen_bot/update_checker.py | 18 +- .../utils/web_scraping_mixin.py | 302 ++++++----- src/kleinanzeigen_bot/utils/xdg_paths.py | 34 +- .../test_web_scraping_mixin_integration.py | 2 +- tests/unit/test_extract.py | 482 +++++++++--------- tests/unit/test_init.py | 31 +- .../test_web_scraping_mixin_chrome_version.py | 160 +++--- tests/unit/test_xdg_paths.py | 84 +-- 15 files changed, 841 insertions(+), 687 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f6eee8c..9023e53 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -172,10 +172,11 @@ jobs: set -eux case "${{ matrix.os }}" in - ubuntu-*) - sudo apt-get install --no-install-recommends -y xvfb - xvfb-run pdm run itest:cov -vv - ;; + ubuntu-*) + sudo apt-get install --no-install-recommends -y xvfb + # Run tests INSIDE xvfb context + xvfb-run bash -c 'pdm run itest:cov -vv' + ;; *) pdm run itest:cov -vv ;; esac diff --git a/README.md b/README.md index 335b38a..d763a37 100644 --- a/README.md +++ b/README.md @@ -248,6 +248,25 @@ Limitation of `download`: It's only possible to extract the cheapest given shipp All configuration files can be in YAML or JSON format. +### Installation modes (portable vs. system-wide) + +On first run, the app may ask which installation mode to use. In non-interactive environments (CI/headless), it defaults to portable mode and will not prompt; `--config` and `--logfile` override only their specific paths, and do not change other mode-dependent paths or the chosen installation mode behavior. + +1. **Portable mode (recommended for most users, especially on Windows):** + - Stores config, logs, downloads, and state in the current directory + - No admin permissions required + - Easy backup/migration; works from USB drives + +2. **System-wide mode (advanced users / multi-user setups):** + - Stores files in OS-standard locations + - Cleaner directory structure; better separation from working directory + - Requires proper permissions for user data directories + +**OS notes (brief):** +- **Windows:** System-wide uses AppData (Roaming/Local); portable keeps everything beside the `.exe`. +- **Linux:** System-wide follows XDG Base Directory spec; portable stays in the current working directory. +- **macOS:** System-wide uses `~/Library/Application Support/kleinanzeigen-bot` (and related dirs); portable stays in the current directory. + ### 1) Main configuration When executing the app it by default looks for a `config.yaml` file in the current directory. If it does not exist it will be created automatically. diff --git a/pyproject.toml b/pyproject.toml index ce78689..b2e3cf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,7 +111,8 @@ lint = { composite = ["lint:ruff", "lint:mypy", "lint:pyright"] } # Run unit tests only (exclude smoke and itest) utest = "python -m pytest --capture=tee-sys -m \"not itest and not smoke\"" # Run integration tests only (exclude smoke) -itest = "python -m pytest --capture=tee-sys -m \"itest and not smoke\"" +# Uses -n 0 to disable xdist parallelization - browser tests are flaky with parallel workers +itest = "python -m pytest --capture=tee-sys -m \"itest and not smoke\" -n 0" # Run smoke tests only smoke = "python -m pytest --capture=tee-sys -m smoke" # Run all tests in order: unit, integration, smoke @@ -126,7 +127,7 @@ test = { composite = ["utest", "itest", "smoke"] } "coverage:prepare" = { shell = "python scripts/coverage_helper.py prepare" } "test:cov" = { composite = ["coverage:prepare", "utest:cov", "itest:cov", "smoke:cov", "coverage:combine"] } "utest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-unit.sqlite .temp/coverage-unit.xml \"not itest and not smoke\"" } -"itest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-itest.sqlite .temp/coverage-integration.xml \"itest and not smoke\"" } +"itest:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-itest.sqlite .temp/coverage-integration.xml \"itest and not smoke\" -n 0" } "smoke:cov" = { shell = "python scripts/coverage_helper.py run .temp/.coverage-smoke.sqlite .temp/coverage-smoke.xml smoke" } "coverage:combine" = { shell = "python scripts/coverage_helper.py combine .temp/.coverage-unit.sqlite .temp/.coverage-itest.sqlite .temp/.coverage-smoke.sqlite" } # Run all tests with coverage in a single invocation diff --git a/schemas/config.schema.json b/schemas/config.schema.json index 08eb71f..e1be95a 100644 --- a/schemas/config.schema.json +++ b/schemas/config.schema.json @@ -185,7 +185,7 @@ "BrowserConfig": { "properties": { "arguments": { - "description": "See https://peter.sh/experiments/chromium-command-line-switches/", + "description": "See https://peter.sh/experiments/chromium-command-line-switches/. Browser profile path is auto-configured based on installation mode (portable/XDG).", "items": { "type": "string" }, @@ -227,8 +227,8 @@ "type": "null" } ], - "default": ".temp/browser-profile", - "description": "See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md", + "default": null, + "description": "See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md. If not specified, defaults to XDG cache directory in XDG mode or .temp/browser-profile in portable mode.", "title": "User Data Dir" }, "profile_name": { diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py index 87134db..7819157 100644 --- a/src/kleinanzeigen_bot/__init__.py +++ b/src/kleinanzeigen_bot/__init__.py @@ -28,7 +28,7 @@ from .utils.web_scraping_mixin import By, Element, Is, WebScrapingMixin # W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933 -LOG: Final[loggers.Logger] = loggers.get_logger(__name__) +LOG:Final[loggers.Logger] = loggers.get_logger(__name__) LOG.setLevel(loggers.INFO) colorama.just_fix_windows_console() @@ -39,7 +39,7 @@ class AdUpdateStrategy(enum.Enum): MODIFY = enum.auto() -def _repost_cycle_ready(ad_cfg: Ad, ad_file_relative: str) -> bool: +def _repost_cycle_ready(ad_cfg:Ad, ad_file_relative:str) -> bool: """ Check if the repost cycle delay has been satisfied. @@ -72,7 +72,7 @@ def _repost_cycle_ready(ad_cfg: Ad, ad_file_relative: str) -> bool: return True -def _day_delay_elapsed(ad_cfg: Ad, ad_file_relative: str) -> bool: +def _day_delay_elapsed(ad_cfg:Ad, ad_file_relative:str) -> bool: """ Check if the day delay has elapsed since the ad was last published. @@ -100,7 +100,7 @@ def _day_delay_elapsed(ad_cfg: Ad, ad_file_relative: str) -> bool: return True -def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> None: +def apply_auto_price_reduction(ad_cfg:Ad, _ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> None: """ Apply automatic price reduction to an ad based on repost count and configuration. @@ -132,7 +132,7 @@ def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file applied_cycles = ad_cfg.price_reduction_count or 0 next_cycle = applied_cycles + 1 - effective_price = calculate_auto_price(base_price=base_price, auto_price_reduction=ad_cfg.auto_price_reduction, target_reduction_cycle=next_cycle) + effective_price = calculate_auto_price(base_price = base_price, auto_price_reduction = ad_cfg.auto_price_reduction, target_reduction_cycle = next_cycle) if effective_price is None: return @@ -149,7 +149,7 @@ def apply_auto_price_reduction(ad_cfg: Ad, _ad_cfg_orig: dict[str, Any], ad_file # Note: price_reduction_count is persisted to ad_cfg_orig only after successful publish -class KleinanzeigenBot(WebScrapingMixin): +class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 def __init__(self) -> None: # workaround for https://github.com/Second-Hand-Friends/kleinanzeigen-bot/issues/295 # see https://github.com/pyinstaller/pyinstaller/issues/7229#issuecomment-1309383026 @@ -159,17 +159,17 @@ class KleinanzeigenBot(WebScrapingMixin): self.root_url = "https://www.kleinanzeigen.de" - self.config: Config + self.config:Config self.config_file_path = abspath("config.yaml") self.config_explicitly_provided = False - self.installation_mode: xdg_paths.InstallationMode | None = None + self.installation_mode:xdg_paths.InstallationMode | None = None - self.categories: dict[str, str] = {} + self.categories:dict[str, str] = {} - self.file_log: loggers.LogFileHandle | None = None + self.file_log:loggers.LogFileHandle | None = None log_file_basename = is_frozen() and os.path.splitext(os.path.basename(sys.executable))[0] or self.__module__ - self.log_file_path: str | None = abspath(f"{log_file_basename}.log") + self.log_file_path:str | None = abspath(f"{log_file_basename}.log") self.log_file_basename = log_file_basename self.log_file_explicitly_provided = False @@ -245,7 +245,7 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info(_("Installation mode: %s"), mode_display) LOG.info(_("Config file: %s"), self.config_file_path) - async def run(self, args: list[str]) -> None: + async def run(self, args:list[str]) -> None: self.parse_args(args) self.finalize_installation_mode() try: @@ -277,7 +277,7 @@ class KleinanzeigenBot(WebScrapingMixin): self.configure_file_logging() self.load_config() checker = UpdateChecker(self.config, self.installation_mode_or_portable) - checker.check_for_updates(skip_interval_check=True) + checker.check_for_updates(skip_interval_check = True) case "update-content-hash": self.configure_file_logging() self.load_config() @@ -285,7 +285,7 @@ class KleinanzeigenBot(WebScrapingMixin): checker = UpdateChecker(self.config, self.installation_mode_or_portable) checker.check_for_updates() self.ads_selector = "all" - if ads := self.load_ads(exclude_ads_with_id=False): + if ads := self.load_ads(exclude_ads_with_id = False): self.update_content_hashes(ads) else: LOG.info("############################################") @@ -503,7 +503,7 @@ class KleinanzeigenBot(WebScrapingMixin): ) ) - def parse_args(self, args: list[str]) -> None: + def parse_args(self, args:list[str]) -> None: try: options, arguments = getopt.gnu_getopt(args[1:], "hv", ["ads=", "config=", "force", "help", "keep-old", "logfile=", "lang=", "verbose"]) except getopt.error as ex: @@ -571,7 +571,7 @@ class KleinanzeigenBot(WebScrapingMixin): default_config.login.password = "changeme" # noqa: S105 placeholder for default config, not a real password dicts.save_dict( self.config_file_path, - default_config.model_dump(exclude_none=True, exclude={"ad_defaults": {"description"}}), + default_config.model_dump(exclude_none = True, exclude = {"ad_defaults": {"description"}}), header=( "# yaml-language-server: $schema=" "https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot" @@ -585,7 +585,7 @@ class KleinanzeigenBot(WebScrapingMixin): self.create_default_config() config_yaml = dicts.load_dict_if_exists(self.config_file_path, _("config")) - self.config = Config.model_validate(config_yaml, strict=True, context=self.config_file_path) + self.config = Config.model_validate(config_yaml, strict = True, context = self.config_file_path) # load built-in category mappings self.categories = dicts.load_dict_from_module(resources, "categories.yaml", "categories") @@ -598,13 +598,13 @@ class KleinanzeigenBot(WebScrapingMixin): # populate browser_config object used by WebScrapingMixin self.browser_config.arguments = self.config.browser.arguments self.browser_config.binary_location = self.config.browser.binary_location - self.browser_config.extensions = [abspath(item, relative_to=self.config_file_path) for item in self.config.browser.extensions] + self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config.browser.extensions] self.browser_config.use_private_window = self.config.browser.use_private_window if self.config.browser.user_data_dir: - self.browser_config.user_data_dir = abspath(self.config.browser.user_data_dir, relative_to=self.config_file_path) + self.browser_config.user_data_dir = abspath(self.config.browser.user_data_dir, relative_to = self.config_file_path) self.browser_config.profile_name = self.config.browser.profile_name - def __check_ad_republication(self, ad_cfg: Ad, ad_file_relative: str) -> bool: + def __check_ad_republication(self, ad_cfg:Ad, ad_file_relative:str) -> bool: """ Check if an ad needs to be republished based on republication interval. Note: This method does not check for content changes. Use __check_ad_changed for that. @@ -635,7 +635,7 @@ class KleinanzeigenBot(WebScrapingMixin): return True - def __check_ad_changed(self, ad_cfg: Ad, ad_cfg_orig: dict[str, Any], ad_file_relative: str) -> bool: + def __check_ad_changed(self, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], ad_file_relative:str) -> bool: """ Check if an ad has been changed since last publication. @@ -662,7 +662,7 @@ class KleinanzeigenBot(WebScrapingMixin): return False - def load_ads(self, *, ignore_inactive: bool = True, exclude_ads_with_id: bool = True) -> list[tuple[str, Ad, dict[str, Any]]]: + def load_ads(self, *, ignore_inactive:bool = True, exclude_ads_with_id:bool = True) -> list[tuple[str, Ad, dict[str, Any]]]: """ Load and validate all ad config files, optionally filtering out inactive or already-published ads. @@ -678,12 +678,12 @@ class KleinanzeigenBot(WebScrapingMixin): """ LOG.info("Searching for ad config files...") - ad_files: dict[str, str] = {} + ad_files:dict[str, str] = {} data_root_dir = os.path.dirname(self.config_file_path) for file_pattern in self.config.ad_files: - for ad_file in glob.glob(file_pattern, root_dir=data_root_dir, flags=glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): + for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): if not str(ad_file).endswith("ad_fields.yaml"): - ad_files[abspath(ad_file, relative_to=data_root_dir)] = ad_file + ad_files[abspath(ad_file, relative_to = data_root_dir)] = ad_file LOG.info(" -> found %s", pluralize("ad config file", ad_files)) if not ad_files: return [] @@ -700,8 +700,8 @@ class KleinanzeigenBot(WebScrapingMixin): ads = [] for ad_file, ad_file_relative in sorted(ad_files.items()): - ad_cfg_orig: dict[str, Any] = dicts.load_dict(ad_file, "ad") - ad_cfg: Ad = self.load_ad(ad_cfg_orig) + ad_cfg_orig:dict[str, Any] = dicts.load_dict(ad_file, "ad") + ad_cfg:Ad = self.load_ad(ad_cfg_orig) if ignore_inactive and not ad_cfg.active: LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file_relative) @@ -738,8 +738,8 @@ class KleinanzeigenBot(WebScrapingMixin): if not should_include: continue - ensure(self.__get_description(ad_cfg, with_affixes=False), f"-> property [description] not specified @ [{ad_file}]") - self.__get_description(ad_cfg, with_affixes=True) # validates complete description + ensure(self.__get_description(ad_cfg, with_affixes = False), f"-> property [description] not specified @ [{ad_file}]") + self.__get_description(ad_cfg, with_affixes = True) # validates complete description if ad_cfg.category: resolved_category_id = self.categories.get(ad_cfg.category) @@ -758,13 +758,13 @@ class KleinanzeigenBot(WebScrapingMixin): ad_dir = os.path.dirname(ad_file) for image_pattern in ad_cfg.images: pattern_images = set() - for image_file in glob.glob(image_pattern, root_dir=ad_dir, flags=glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): + for image_file in glob.glob(image_pattern, root_dir = ad_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB): _, image_file_ext = os.path.splitext(image_file) ensure(image_file_ext.lower() in {".gif", ".jpg", ".jpeg", ".png"}, f"Unsupported image file type [{image_file}]") if os.path.isabs(image_file): pattern_images.add(image_file) else: - pattern_images.add(abspath(image_file, relative_to=ad_file)) + pattern_images.add(abspath(image_file, relative_to = ad_file)) images.extend(sorted(pattern_images)) ensure(images or not ad_cfg.images, f"No images found for given file patterns {ad_cfg.images} at {ad_dir}") ad_cfg.images = list(dict.fromkeys(images)) @@ -774,13 +774,13 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info("Loaded %s", pluralize("ad", ads)) return ads - def load_ad(self, ad_cfg_orig: dict[str, Any]) -> Ad: + def load_ad(self, ad_cfg_orig:dict[str, Any]) -> Ad: return AdPartial.model_validate(ad_cfg_orig).to_ad(self.config.ad_defaults) - async def check_and_wait_for_captcha(self, *, is_login_page: bool = True) -> None: + async def check_and_wait_for_captcha(self, *, is_login_page:bool = True) -> None: try: captcha_timeout = self._timeout("captcha_detection") - await self.web_find(By.CSS_SELECTOR, "iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']", timeout=captcha_timeout) + await self.web_find(By.CSS_SELECTOR, "iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']", timeout = captcha_timeout) if not is_login_page and self.config.captcha.auto_restart: LOG.warning("Captcha recognized - auto-restart enabled, abort run...") @@ -833,14 +833,14 @@ class KleinanzeigenBot(WebScrapingMixin): await self.web_input(By.ID, "login-password", "") await self.web_input(By.ID, "login-password", self.config.login.password) - await self.check_and_wait_for_captcha(is_login_page=True) + await self.check_and_wait_for_captcha(is_login_page = True) await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']") async def handle_after_login_logic(self) -> None: try: sms_timeout = self._timeout("sms_verification") - await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout=sms_timeout) + await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout) LOG.warning("############################################") LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") LOG.warning("############################################") @@ -852,10 +852,10 @@ class KleinanzeigenBot(WebScrapingMixin): try: LOG.info("Handling GDPR disclaimer...") gdpr_timeout = self._timeout("gdpr_prompt") - await self.web_find(By.ID, "gdpr-banner-accept", timeout=gdpr_timeout) + await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout) await self.web_click(By.ID, "gdpr-banner-cmp-button") await self.web_click( - By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout=gdpr_timeout + By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout = gdpr_timeout ) except TimeoutError: # GDPR banner not shown within timeout. @@ -873,7 +873,7 @@ class KleinanzeigenBot(WebScrapingMixin): # Try to find the standard element first try: - user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout=login_check_timeout) + user_info = await self.web_text(By.CLASS_NAME, "mr-medium", timeout = login_check_timeout) if username in user_info.lower(): LOG.debug(_("Login detected via .mr-medium element")) return True @@ -882,7 +882,7 @@ class KleinanzeigenBot(WebScrapingMixin): # If standard element not found or didn't contain username, try the alternative try: - user_info = await self.web_text(By.ID, "user-email", timeout=login_check_timeout) + user_info = await self.web_text(By.ID, "user-email", timeout = login_check_timeout) if username in user_info.lower(): LOG.debug(_("Login detected via #user-email element")) return True @@ -892,7 +892,7 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.debug(_("No login detected - neither .mr-medium nor #user-email found with username")) return False - async def delete_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None: + async def delete_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: count = 0 published_ads = json.loads((await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"] @@ -900,14 +900,14 @@ class KleinanzeigenBot(WebScrapingMixin): for ad_file, ad_cfg, _ad_cfg_orig in ad_cfgs: count += 1 LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg.title, ad_file) - await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=self.config.publishing.delete_old_ads_by_title) + await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title) await self.web_sleep() LOG.info("############################################") LOG.info("DONE: Deleted %s", pluralize("ad", count)) LOG.info("############################################") - async def delete_ad(self, ad_cfg: Ad, published_ads: list[dict[str, Any]], *, delete_old_ads_by_title: bool) -> bool: + async def delete_ad(self, ad_cfg:Ad, published_ads:list[dict[str, Any]], *, delete_old_ads_by_title:bool) -> bool: LOG.info("Deleting ad '%s' if already present...", ad_cfg.title) await self.web_open(f"{self.root_url}/m-meine-anzeigen.html") @@ -922,21 +922,21 @@ class KleinanzeigenBot(WebScrapingMixin): if ad_cfg.id == published_ad_id or ad_cfg.title == published_ad_title: LOG.info(" -> deleting %s '%s'...", published_ad_id, published_ad_title) await self.web_request( - url=f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method="POST", headers={"x-csrf-token": str(csrf_token)} + url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={published_ad_id}", method = "POST", headers = {"x-csrf-token": str(csrf_token)} ) elif ad_cfg.id: await self.web_request( - url=f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg.id}", - method="POST", - headers={"x-csrf-token": str(csrf_token)}, - valid_response_codes=[200, 404], + url = f"{self.root_url}/m-anzeigen-loeschen.json?ids={ad_cfg.id}", + method = "POST", + headers = {"x-csrf-token": str(csrf_token)}, + valid_response_codes = [200, 404], ) await self.web_sleep() ad_cfg.id = None return True - async def extend_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None: + async def extend_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: """Extends ads that are close to expiry.""" # Fetch currently published ads from API published_ads = json.loads((await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"] @@ -986,7 +986,7 @@ class KleinanzeigenBot(WebScrapingMixin): # Process extensions success_count = 0 - for idx, (ad_file, ad_cfg, ad_cfg_orig, _published_ad) in enumerate(ads_to_extend, start=1): + for idx, (ad_file, ad_cfg, ad_cfg_orig, _published_ad) in enumerate(ads_to_extend, start = 1): LOG.info(_("Processing %s/%s: '%s' from [%s]..."), idx, len(ads_to_extend), ad_cfg.title, ad_file) if await self.extend_ad(ad_file, ad_cfg, ad_cfg_orig): success_count += 1 @@ -996,7 +996,7 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info(_("DONE: Extended %s"), pluralize("ad", success_count)) LOG.info("############################################") - async def extend_ad(self, ad_file: str, ad_cfg: Ad, ad_cfg_orig: dict[str, Any]) -> bool: + async def extend_ad(self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any]) -> bool: """Extends a single ad listing.""" LOG.info(_("Extending ad '%s' (ID: %s)..."), ad_cfg.title, ad_cfg.id) @@ -1021,14 +1021,14 @@ class KleinanzeigenBot(WebScrapingMixin): # Simply close the dialog with the X button (aria-label="Schließen") try: dialog_close_timeout = self._timeout("quick_dom") - await self.web_click(By.CSS_SELECTOR, 'button[aria-label="Schließen"]', timeout=dialog_close_timeout) + await self.web_click(By.CSS_SELECTOR, 'button[aria-label="Schließen"]', timeout = dialog_close_timeout) LOG.debug(" -> Closed confirmation dialog") except TimeoutError: LOG.warning(_(" -> No confirmation dialog found, extension may have completed directly")) # Update metadata in YAML file # Update updated_on to track when ad was extended - ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec="seconds") + ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec = "seconds") dicts.save_dict(ad_file, ad_cfg_orig) LOG.info(_(" -> SUCCESS: ad extended with ID %s"), ad_cfg.id) @@ -1045,7 +1045,7 @@ class KleinanzeigenBot(WebScrapingMixin): # Check for success messages return await self.web_check(By.ID, "checking-done", Is.DISPLAYED) or await self.web_check(By.ID, "not-completed", Is.DISPLAYED) - async def publish_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None: + async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: count = 0 failed_count = 0 max_retries = 3 @@ -1082,12 +1082,12 @@ class KleinanzeigenBot(WebScrapingMixin): if success: try: publish_timeout = self._timeout("publishing_result") - await self.web_await(self.__check_publishing_result, timeout=publish_timeout) + await self.web_await(self.__check_publishing_result, timeout = publish_timeout) except TimeoutError: LOG.warning(_(" -> Could not confirm publishing for '%s', but ad may be online"), ad_cfg.title) if success and self.config.publishing.delete_old_ads == "AFTER_PUBLISH" and not self.keep_old_ads: - await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=False) + await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = False) LOG.info("############################################") if failed_count > 0: @@ -1097,7 +1097,7 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info("############################################") async def publish_ad( - self, ad_file: str, ad_cfg: Ad, ad_cfg_orig: dict[str, Any], published_ads: list[dict[str, Any]], mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE + self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]], mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE ) -> None: """ @param ad_cfg: the effective ad config (i.e. with default values applied etc.) @@ -1108,7 +1108,7 @@ class KleinanzeigenBot(WebScrapingMixin): if mode == AdUpdateStrategy.REPLACE: if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads: - await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title=self.config.publishing.delete_old_ads_by_title) + await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title) # Apply auto price reduction only for REPLACE operations (actual reposts) # This ensures price reductions only happen on republish, not on UPDATE @@ -1197,12 +1197,12 @@ class KleinanzeigenBot(WebScrapingMixin): elif not await self.web_check(By.ID, "radio-buy-now-no", Is.SELECTED): await self.web_click(By.ID, "radio-buy-now-no") except TimeoutError as ex: - LOG.debug(ex, exc_info=True) + LOG.debug(ex, exc_info = True) ############################# # set description ############################# - description = self.__get_description(ad_cfg, with_affixes=True) + description = self.__get_description(ad_cfg, with_affixes = True) await self.web_execute("document.querySelector('#pstad-descrptn').value = `" + description.replace("`", "'") + "`") await self.__set_contact_fields(ad_cfg.contact) @@ -1213,7 +1213,7 @@ class KleinanzeigenBot(WebScrapingMixin): ############################# img_items = await self.web_find_all(By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)") for element in img_items: - btn = await self.web_find(By.CSS_SELECTOR, "button.pictureupload-thumbnails-remove", parent=element) + btn = await self.web_find(By.CSS_SELECTOR, "button.pictureupload-thumbnails-remove", parent = element) await btn.click() ############################# @@ -1224,7 +1224,7 @@ class KleinanzeigenBot(WebScrapingMixin): ############################# # wait for captcha ############################# - await self.check_and_wait_for_captcha(is_login_page=False) + await self.check_and_wait_for_captcha(is_login_page = False) ############################# # submit @@ -1250,7 +1250,7 @@ class KleinanzeigenBot(WebScrapingMixin): ############################# try: short_timeout = self._timeout("quick_dom") - await self.web_find(By.ID, "myftr-shppngcrt-frm", timeout=short_timeout) + await self.web_find(By.ID, "myftr-shppngcrt-frm", timeout = short_timeout) LOG.warning("############################################") LOG.warning("# Payment form detected! Please proceed with payment.") @@ -1262,7 +1262,7 @@ class KleinanzeigenBot(WebScrapingMixin): pass confirmation_timeout = self._timeout("publishing_confirmation") - await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout=confirmation_timeout) + await self.web_await(lambda: "p-anzeige-aufgeben-bestaetigung.html?adId=" in self.page.url, timeout = confirmation_timeout) # extract the ad id from the URL's query parameter current_url_query_params = urllib_parse.parse_qs(urllib_parse.urlparse(self.page.url).query) @@ -1272,7 +1272,7 @@ class KleinanzeigenBot(WebScrapingMixin): # Update content hash after successful publication # Calculate hash on original config to ensure consistent comparison on restart ad_cfg_orig["content_hash"] = AdPartial.model_validate(ad_cfg_orig).update_content_hash().content_hash - ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec="seconds") + ad_cfg_orig["updated_on"] = misc.now().isoformat(timespec = "seconds") if not ad_cfg.created_on and not ad_cfg.id: ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"] @@ -1299,7 +1299,7 @@ class KleinanzeigenBot(WebScrapingMixin): dicts.save_dict(ad_file, ad_cfg_orig) - async def __set_contact_fields(self, contact: Contact) -> None: + async def __set_contact_fields(self, contact:Contact) -> None: ############################# # set contact zipcode ############################# @@ -1384,7 +1384,7 @@ class KleinanzeigenBot(WebScrapingMixin): ) ) - async def update_ads(self, ad_cfgs: list[tuple[str, Ad, dict[str, Any]]]) -> None: + async def update_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: """ Updates a list of ads. The list gets filtered, so that only already published ads will be updated. @@ -1415,25 +1415,25 @@ class KleinanzeigenBot(WebScrapingMixin): await self.publish_ad(ad_file, ad_cfg, ad_cfg_orig, published_ads, AdUpdateStrategy.MODIFY) publish_timeout = self._timeout("publishing_result") - await self.web_await(self.__check_publishing_result, timeout=publish_timeout) + await self.web_await(self.__check_publishing_result, timeout = publish_timeout) LOG.info("############################################") LOG.info("DONE: updated %s", pluralize("ad", count)) LOG.info("############################################") - async def __set_condition(self, condition_value: str) -> None: + async def __set_condition(self, condition_value:str) -> None: try: # Open condition dialog await self.web_click(By.XPATH, '//*[@id="j-post-listing-frontend-conditions"]//button[@aria-haspopup="true"]') except TimeoutError: - LOG.debug("Unable to open condition dialog and select condition [%s]", condition_value, exc_info=True) + LOG.debug("Unable to open condition dialog and select condition [%s]", condition_value, exc_info = True) return try: # Click radio button await self.web_click(By.ID, f"radio-button-{condition_value}") except TimeoutError: - LOG.debug("Unable to select condition [%s]", condition_value, exc_info=True) + LOG.debug("Unable to select condition [%s]", condition_value, exc_info = True) try: # Click accept button @@ -1441,7 +1441,7 @@ class KleinanzeigenBot(WebScrapingMixin): except TimeoutError as ex: raise TimeoutError(_("Unable to close condition dialog!")) from ex - async def __set_category(self, category: str | None, ad_file: str) -> None: + async def __set_category(self, category:str | None, ad_file:str) -> None: # click on something to trigger automatic category detection await self.web_click(By.ID, "pstad-descrptn") @@ -1464,7 +1464,7 @@ class KleinanzeigenBot(WebScrapingMixin): else: ensure(is_category_auto_selected, f"No category specified in [{ad_file}] and automatic category detection failed") - async def __set_special_attributes(self, ad_cfg: Ad) -> None: + async def __set_special_attributes(self, ad_cfg:Ad) -> None: if not ad_cfg.special_attributes: return @@ -1499,7 +1499,7 @@ class KleinanzeigenBot(WebScrapingMixin): raise TimeoutError(_("Failed to set attribute '%s'") % special_attribute_key) from ex try: - elem_id: str = str(special_attr_elem.attrs.id) + elem_id:str = str(special_attr_elem.attrs.id) if special_attr_elem.local_name == "select": LOG.debug(_("Attribute field '%s' seems to be a select..."), special_attribute_key) await self.web_select(By.ID, elem_id, special_attribute_value_str) @@ -1517,26 +1517,26 @@ class KleinanzeigenBot(WebScrapingMixin): raise TimeoutError(_("Failed to set attribute '%s'") % special_attribute_key) from ex LOG.debug("Successfully set attribute field [%s] to [%s]...", special_attribute_key, special_attribute_value_str) - async def __set_shipping(self, ad_cfg: Ad, mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None: + async def __set_shipping(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None: short_timeout = self._timeout("quick_dom") if ad_cfg.shipping_type == "PICKUP": try: await self.web_click(By.ID, "radio-pickup") except TimeoutError as ex: - LOG.debug(ex, exc_info=True) + LOG.debug(ex, exc_info = True) elif ad_cfg.shipping_options: await self.web_click(By.XPATH, '//button//span[contains(., "Versandmethoden auswählen")]') if mode == AdUpdateStrategy.MODIFY: try: # when "Andere Versandmethoden" is not available, go back and start over new - await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout=short_timeout) + await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout = short_timeout) except TimeoutError: await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]') # in some categories we need to go another dialog back try: - await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout=short_timeout) + await self.web_find(By.XPATH, '//dialog//button[contains(., "Andere Versandmethoden")]', timeout = short_timeout) except TimeoutError: await self.web_click(By.XPATH, '//dialog//button[contains(., "Zurück")]') @@ -1562,7 +1562,7 @@ class KleinanzeigenBot(WebScrapingMixin): try: # only click on "Individueller Versand" when "IndividualShippingInput" is not available, otherwise its already checked # (important for mode = UPDATE) - await self.web_find(By.XPATH, '//input[contains(@placeholder, "Versandkosten (optional)")]', timeout=short_timeout) + await self.web_find(By.XPATH, '//input[contains(@placeholder, "Versandkosten (optional)")]', timeout = short_timeout) except TimeoutError: # Input not visible yet; click the individual shipping option. await self.web_click(By.XPATH, '//*[contains(@id, "INDIVIDUAL") and contains(@data-testid, "Individueller Versand")]') @@ -1573,10 +1573,10 @@ class KleinanzeigenBot(WebScrapingMixin): ) await self.web_click(By.XPATH, '//dialog//button[contains(., "Fertig")]') except TimeoutError as ex: - LOG.debug(ex, exc_info=True) + LOG.debug(ex, exc_info = True) raise TimeoutError(_("Unable to close shipping dialog!")) from ex - async def __set_shipping_options(self, ad_cfg: Ad, mode: AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None: + async def __set_shipping_options(self, ad_cfg:Ad, mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE) -> None: if not ad_cfg.shipping_options: return @@ -1596,7 +1596,7 @@ class KleinanzeigenBot(WebScrapingMixin): except KeyError as ex: raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg.shipping_options}") from ex - shipping_sizes, shipping_selector, shipping_packages = zip(*mapped_shipping_options, strict=False) + shipping_sizes, shipping_selector, shipping_packages = zip(*mapped_shipping_options, strict = False) try: (shipping_size,) = set(shipping_sizes) @@ -1652,19 +1652,19 @@ class KleinanzeigenBot(WebScrapingMixin): for shipping_package in to_be_clicked_shipping_packages: await self.web_click(By.XPATH, f'//dialog//input[contains(@data-testid, "{shipping_package}")]') except TimeoutError as ex: - LOG.debug(ex, exc_info=True) + LOG.debug(ex, exc_info = True) try: # Click apply button await self.web_click(By.XPATH, '//dialog//button[contains(., "Fertig")]') except TimeoutError as ex: raise TimeoutError(_("Unable to close shipping dialog!")) from ex - async def __upload_images(self, ad_cfg: Ad) -> None: + async def __upload_images(self, ad_cfg:Ad) -> None: if not ad_cfg.images: return LOG.info(" -> found %s", pluralize("image", ad_cfg.images)) - image_upload: Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]") + image_upload:Element = await self.web_find(By.CSS_SELECTOR, "input[type=file]") for image in ad_cfg.images: LOG.info(" -> uploading image [%s]", image) @@ -1680,7 +1680,7 @@ class KleinanzeigenBot(WebScrapingMixin): thumbnails = await self.web_find_all( By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)", - timeout=self._timeout("quick_dom"), # Fast timeout for polling + timeout = self._timeout("quick_dom"), # Fast timeout for polling ) current_count = len(thumbnails) if current_count < expected_count: @@ -1691,12 +1691,12 @@ class KleinanzeigenBot(WebScrapingMixin): return False try: - await self.web_await(check_thumbnails_uploaded, timeout=self._timeout("image_upload"), timeout_error_message=_("Image upload timeout exceeded")) + await self.web_await(check_thumbnails_uploaded, timeout = self._timeout("image_upload"), timeout_error_message = _("Image upload timeout exceeded")) except TimeoutError as ex: # Get current count for better error message try: thumbnails = await self.web_find_all( - By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)", timeout=self._timeout("quick_dom") + By.CSS_SELECTOR, "ul#j-pictureupload-thumbnails > li:not(.is-placeholder)", timeout = self._timeout("quick_dom") ) current_count = len(thumbnails) except TimeoutError: @@ -1738,7 +1738,7 @@ class KleinanzeigenBot(WebScrapingMixin): elif self.ads_selector == "new": # download only unsaved ads # check which ads already saved saved_ad_ids = [] - ads = self.load_ads(ignore_inactive=False, exclude_ads_with_id=False) # do not skip because of existing IDs + ads = self.load_ads(ignore_inactive = False, exclude_ads_with_id = False) # do not skip because of existing IDs for ad in ads: saved_ad_id = ad[1].id if saved_ad_id is None: @@ -1775,7 +1775,7 @@ class KleinanzeigenBot(WebScrapingMixin): else: LOG.error("The page with the id %d does not exist!", ad_id) - def __get_description(self, ad_cfg: Ad, *, with_affixes: bool) -> str: + def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str: """Get the ad description optionally with prefix and suffix applied. Precedence (highest to lowest): @@ -1827,7 +1827,7 @@ class KleinanzeigenBot(WebScrapingMixin): return final_description - def update_content_hashes(self, ads: list[tuple[str, Ad, dict[str, Any]]]) -> None: + def update_content_hashes(self, ads:list[tuple[str, Ad, dict[str, Any]]]) -> None: count = 0 for ad_file, ad_cfg, ad_cfg_orig in ads: @@ -1848,7 +1848,7 @@ class KleinanzeigenBot(WebScrapingMixin): ############################# -def main(args: list[str]) -> None: +def main(args:list[str]) -> None: if "version" not in args: print( textwrap.dedent(rf""" @@ -1861,7 +1861,7 @@ def main(args: list[str]) -> None: https://github.com/Second-Hand-Friends/kleinanzeigen-bot Version: {__version__} """)[1:], - flush=True, + flush = True, ) # [1:] removes the first empty blank line loggers.configure_console_logging() diff --git a/src/kleinanzeigen_bot/model/config_model.py b/src/kleinanzeigen_bot/model/config_model.py index 9ad7438..e426a28 100644 --- a/src/kleinanzeigen_bot/model/config_model.py +++ b/src/kleinanzeigen_bot/model/config_model.py @@ -15,22 +15,22 @@ from kleinanzeigen_bot.utils import dicts from kleinanzeigen_bot.utils.misc import get_attr from kleinanzeigen_bot.utils.pydantics import ContextualModel -_MAX_PERCENTAGE: Final[int] = 100 +_MAX_PERCENTAGE:Final[int] = 100 class AutoPriceReductionConfig(ContextualModel): - enabled: bool = Field(default=False, description="automatically lower the price of reposted ads") - strategy: Literal["FIXED", "PERCENTAGE"] | None = Field( - default=None, description="PERCENTAGE reduces by a percentage of the previous price, FIXED reduces by a fixed amount" + enabled:bool = Field(default = False, description = "automatically lower the price of reposted ads") + strategy:Literal["FIXED", "PERCENTAGE"] | None = Field( + default = None, description = "PERCENTAGE reduces by a percentage of the previous price, FIXED reduces by a fixed amount" ) - amount: float | None = Field( - default=None, gt=0, description="magnitude of the reduction; interpreted as percent for PERCENTAGE or currency units for FIXED" + amount:float | None = Field( + default = None, gt = 0, description = "magnitude of the reduction; interpreted as percent for PERCENTAGE or currency units for FIXED" ) - min_price: float | None = Field(default=None, ge=0, description="required when enabled is true; minimum price floor (use 0 for no lower bound)") - delay_reposts: int = Field(default=0, ge=0, description="number of reposts to wait before applying the first automatic price reduction") - delay_days: int = Field(default=0, ge=0, description="number of days to wait after publication before applying automatic price reductions") + min_price:float | None = Field(default = None, ge = 0, description = "required when enabled is true; minimum price floor (use 0 for no lower bound)") + delay_reposts:int = Field(default = 0, ge = 0, description = "number of reposts to wait before applying the first automatic price reduction") + delay_days:int = Field(default = 0, ge = 0, description = "number of days to wait after publication before applying automatic price reductions") - @model_validator(mode="after") + @model_validator(mode = "after") def _validate_config(self) -> "AutoPriceReductionConfig": if self.enabled: if self.strategy is None: @@ -45,38 +45,38 @@ class AutoPriceReductionConfig(ContextualModel): class ContactDefaults(ContextualModel): - name: str | None = None - street: str | None = None - zipcode: int | str | None = None - location: str | None = Field( - default=None, description="city or locality of the listing (can include multiple districts)", examples=["Sample Town - District One"] + name:str | None = None + street:str | None = None + zipcode:int | str | None = None + location:str | None = Field( + default = None, description = "city or locality of the listing (can include multiple districts)", examples = ["Sample Town - District One"] ) - phone: str | None = None + phone:str | None = None @deprecated("Use description_prefix/description_suffix instead") class DescriptionAffixes(ContextualModel): - prefix: str | None = None - suffix: str | None = None + prefix:str | None = None + suffix:str | None = None class AdDefaults(ContextualModel): - active: bool = True - type: Literal["OFFER", "WANTED"] = "OFFER" - description: DescriptionAffixes | None = None - description_prefix: str | None = Field(default=None, description="prefix for the ad description") - description_suffix: str | None = Field(default=None, description=" suffix for the ad description") - price_type: Literal["FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"] = "NEGOTIABLE" - auto_price_reduction: AutoPriceReductionConfig = Field(default_factory=AutoPriceReductionConfig, description="automatic price reduction configuration") - shipping_type: Literal["PICKUP", "SHIPPING", "NOT_APPLICABLE"] = "SHIPPING" - sell_directly: bool = Field(default=False, description="requires shipping_type SHIPPING to take effect") - images: list[str] | None = Field(default=None) - contact: ContactDefaults = Field(default_factory=ContactDefaults) - republication_interval: int = 7 + active:bool = True + type:Literal["OFFER", "WANTED"] = "OFFER" + description:DescriptionAffixes | None = None + description_prefix:str | None = Field(default = None, description = "prefix for the ad description") + description_suffix:str | None = Field(default = None, description = "suffix for the ad description") + price_type:Literal["FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"] = "NEGOTIABLE" + auto_price_reduction:AutoPriceReductionConfig = Field(default_factory = AutoPriceReductionConfig, description = "automatic price reduction configuration") + shipping_type:Literal["PICKUP", "SHIPPING", "NOT_APPLICABLE"] = "SHIPPING" + sell_directly:bool = Field(default = False, description = "requires shipping_type SHIPPING to take effect") + images:list[str] | None = Field(default = None) + contact:ContactDefaults = Field(default_factory = ContactDefaults) + republication_interval:int = 7 - @model_validator(mode="before") + @model_validator(mode = "before") @classmethod - def migrate_legacy_description(cls, values: dict[str, Any]) -> dict[str, Any]: + def migrate_legacy_description(cls, values:dict[str, Any]) -> dict[str, Any]: # Ensure flat prefix/suffix take precedence over deprecated nested "description" description_prefix = values.get("description_prefix") description_suffix = values.get("description_suffix") @@ -91,71 +91,74 @@ class AdDefaults(ContextualModel): class DownloadConfig(ContextualModel): - include_all_matching_shipping_options: bool = Field(default=False, description="if true, all shipping options matching the package size will be included") - excluded_shipping_options: list[str] = Field(default_factory=list, description="list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']") - folder_name_max_length: int = Field(default=100, ge=10, le=255, description="maximum length for folder names when downloading ads (default: 100)") - rename_existing_folders: bool = Field(default=False, description="if true, rename existing folders without titles to include titles (default: false)") + include_all_matching_shipping_options:bool = Field( + default = False, + description = "if true, all shipping options matching the package size will be included", + ) + excluded_shipping_options:list[str] = Field(default_factory = list, description = "list of shipping options to exclude, e.g. ['DHL_2', 'DHL_5']") + folder_name_max_length:int = Field(default = 100, ge = 10, le = 255, description = "maximum length for folder names when downloading ads (default: 100)") + rename_existing_folders:bool = Field(default = False, description = "if true, rename existing folders without titles to include titles (default: false)") class BrowserConfig(ContextualModel): - arguments: list[str] = Field( - default_factory=list, + arguments:list[str] = Field( + default_factory = list, description=( "See https://peter.sh/experiments/chromium-command-line-switches/. " "Browser profile path is auto-configured based on installation mode (portable/XDG)." ), ) - binary_location: str | None = Field(default=None, description="path to custom browser executable, if not specified will be looked up on PATH") - extensions: list[str] = Field(default_factory=list, description="a list of .crx extension files to be loaded") - use_private_window: bool = True - user_data_dir: str | None = Field( - default=None, + binary_location:str | None = Field(default = None, description = "path to custom browser executable, if not specified will be looked up on PATH") + extensions:list[str] = Field(default_factory = list, description = "a list of .crx extension files to be loaded") + use_private_window:bool = True + user_data_dir:str | None = Field( + default = None, description=( "See https://github.com/chromium/chromium/blob/main/docs/user_data_dir.md. " "If not specified, defaults to XDG cache directory in XDG mode or .temp/browser-profile in portable mode." ), ) - profile_name: str | None = None + profile_name:str | None = None class LoginConfig(ContextualModel): - username: str = Field(..., min_length=1) - password: str = Field(..., min_length=1) + username:str = Field(..., min_length = 1) + password:str = Field(..., min_length = 1) class PublishingConfig(ContextualModel): - delete_old_ads: Literal["BEFORE_PUBLISH", "AFTER_PUBLISH", "NEVER"] | None = "AFTER_PUBLISH" - delete_old_ads_by_title: bool = Field(default=True, description="only works if delete_old_ads is set to BEFORE_PUBLISH") + delete_old_ads:Literal["BEFORE_PUBLISH", "AFTER_PUBLISH", "NEVER"] | None = "AFTER_PUBLISH" + delete_old_ads_by_title:bool = Field(default = True, description = "only works if delete_old_ads is set to BEFORE_PUBLISH") class CaptchaConfig(ContextualModel): - auto_restart: bool = False - restart_delay: str = "6h" + auto_restart:bool = False + restart_delay:str = "6h" class TimeoutConfig(ContextualModel): - multiplier: float = Field(default=1.0, ge=0.1, description="Global multiplier applied to all timeout values.") - default: float = Field(default=5.0, ge=0.0, description="Baseline timeout for DOM interactions.") - page_load: float = Field(default=15.0, ge=1.0, description="Page load timeout for web_open.") - captcha_detection: float = Field(default=2.0, ge=0.1, description="Timeout for captcha iframe detection.") - sms_verification: float = Field(default=4.0, ge=0.1, description="Timeout for SMS verification prompts.") - gdpr_prompt: float = Field(default=10.0, ge=1.0, description="Timeout for GDPR/consent dialogs.") - login_detection: float = Field(default=10.0, ge=1.0, description="Timeout for detecting existing login session via DOM elements.") - publishing_result: float = Field(default=300.0, ge=10.0, description="Timeout for publishing result checks.") - publishing_confirmation: float = Field(default=20.0, ge=1.0, description="Timeout for publish confirmation redirect.") - image_upload: float = Field(default=30.0, ge=5.0, description="Timeout for image upload and server-side processing.") - pagination_initial: float = Field(default=10.0, ge=1.0, description="Timeout for initial pagination lookup.") - pagination_follow_up: float = Field(default=5.0, ge=1.0, description="Timeout for subsequent pagination navigation.") - quick_dom: float = Field(default=2.0, ge=0.1, description="Generic short timeout for transient UI.") - update_check: float = Field(default=10.0, ge=1.0, description="Timeout for GitHub update checks.") - chrome_remote_probe: float = Field(default=2.0, ge=0.1, description="Timeout for local remote-debugging probes.") - chrome_remote_debugging: float = Field(default=5.0, ge=1.0, description="Timeout for remote debugging API calls.") - chrome_binary_detection: float = Field(default=10.0, ge=1.0, description="Timeout for chrome --version subprocesses.") - retry_enabled: bool = Field(default=True, description="Enable built-in retry/backoff for DOM operations.") - retry_max_attempts: int = Field(default=2, ge=1, description="Max retry attempts when retry is enabled.") - retry_backoff_factor: float = Field(default=1.5, ge=1.0, description="Exponential factor applied per retry attempt.") + multiplier:float = Field(default = 1.0, ge = 0.1, description = "Global multiplier applied to all timeout values.") + default:float = Field(default = 5.0, ge = 0.0, description = "Baseline timeout for DOM interactions.") + page_load:float = Field(default = 15.0, ge = 1.0, description = "Page load timeout for web_open.") + captcha_detection:float = Field(default = 2.0, ge = 0.1, description = "Timeout for captcha iframe detection.") + sms_verification:float = Field(default = 4.0, ge = 0.1, description = "Timeout for SMS verification prompts.") + gdpr_prompt:float = Field(default = 10.0, ge = 1.0, description = "Timeout for GDPR/consent dialogs.") + login_detection:float = Field(default = 10.0, ge = 1.0, description = "Timeout for detecting existing login session via DOM elements.") + publishing_result:float = Field(default = 300.0, ge = 10.0, description = "Timeout for publishing result checks.") + publishing_confirmation:float = Field(default = 20.0, ge = 1.0, description = "Timeout for publish confirmation redirect.") + image_upload:float = Field(default = 30.0, ge = 5.0, description = "Timeout for image upload and server-side processing.") + pagination_initial:float = Field(default = 10.0, ge = 1.0, description = "Timeout for initial pagination lookup.") + pagination_follow_up:float = Field(default = 5.0, ge = 1.0, description = "Timeout for subsequent pagination navigation.") + quick_dom:float = Field(default = 2.0, ge = 0.1, description = "Generic short timeout for transient UI.") + update_check:float = Field(default = 10.0, ge = 1.0, description = "Timeout for GitHub update checks.") + chrome_remote_probe:float = Field(default = 2.0, ge = 0.1, description = "Timeout for local remote-debugging probes.") + chrome_remote_debugging:float = Field(default = 5.0, ge = 1.0, description = "Timeout for remote debugging API calls.") + chrome_binary_detection:float = Field(default = 10.0, ge = 1.0, description = "Timeout for chrome --version subprocesses.") + retry_enabled:bool = Field(default = True, description = "Enable built-in retry/backoff for DOM operations.") + retry_max_attempts:int = Field(default = 2, ge = 1, description = "Max retry attempts when retry is enabled.") + retry_backoff_factor:float = Field(default = 1.5, ge = 1.0, description = "Exponential factor applied per retry attempt.") - def resolve(self, key: str = "default", override: float | None = None) -> float: + def resolve(self, key:str = "default", override:float | None = None) -> float: """ Return the base timeout (seconds) for the given key without applying modifiers. """ @@ -171,7 +174,7 @@ class TimeoutConfig(ContextualModel): return float(self.default) - def effective(self, key: str = "default", override: float | None = None, *, attempt: int = 0) -> float: + def effective(self, key:str = "default", override:float | None = None, *, attempt:int = 0) -> float: """ Return the effective timeout (seconds) with multiplier/backoff applied. """ @@ -180,7 +183,7 @@ class TimeoutConfig(ContextualModel): return base * self.multiplier * backoff -def _validate_glob_pattern(v: str) -> str: +def _validate_glob_pattern(v:str) -> str: if not v.strip(): raise ValueError("must be a non-empty, non-blank glob pattern") return v @@ -190,20 +193,20 @@ GlobPattern = Annotated[str, AfterValidator(_validate_glob_pattern)] class Config(ContextualModel): - ad_files: list[GlobPattern] = Field( - default_factory=lambda: ["./**/ad_*.{json,yml,yaml}"], - min_items=1, - description=""" + ad_files:list[GlobPattern] = Field( + default_factory = lambda: ["./**/ad_*.{json,yml,yaml}"], + min_items = 1, + description = """ glob (wildcard) patterns to select ad configuration files if relative paths are specified, then they are relative to this configuration file """, ) # type: ignore[call-overload] - ad_defaults: AdDefaults = Field(default_factory=AdDefaults, description="Default values for ads, can be overwritten in each ad configuration file") + ad_defaults:AdDefaults = Field(default_factory = AdDefaults, description = "Default values for ads, can be overwritten in each ad configuration file") - categories: dict[str, str] = Field( - default_factory=dict, - description=""" + categories:dict[str, str] = Field( + default_factory = dict, + description = """ additional name to category ID mappings, see default list at https://github.com/Second-Hand-Friends/kleinanzeigen-bot/blob/main/src/kleinanzeigen_bot/resources/categories.yaml @@ -214,13 +217,13 @@ Example: """, ) - download: DownloadConfig = Field(default_factory=DownloadConfig) - publishing: PublishingConfig = Field(default_factory=PublishingConfig) - browser: BrowserConfig = Field(default_factory=BrowserConfig, description="Browser configuration") - login: LoginConfig = Field(default_factory=LoginConfig.model_construct, description="Login credentials") - captcha: CaptchaConfig = Field(default_factory=CaptchaConfig) - update_check: UpdateCheckConfig = Field(default_factory=UpdateCheckConfig, description="Update check configuration") - timeouts: TimeoutConfig = Field(default_factory=TimeoutConfig, description="Centralized timeout configuration.") + download:DownloadConfig = Field(default_factory = DownloadConfig) + publishing:PublishingConfig = Field(default_factory = PublishingConfig) + browser:BrowserConfig = Field(default_factory = BrowserConfig, description = "Browser configuration") + login:LoginConfig = Field(default_factory = LoginConfig.model_construct, description = "Login credentials") + captcha:CaptchaConfig = Field(default_factory = CaptchaConfig) + update_check:UpdateCheckConfig = Field(default_factory = UpdateCheckConfig, description = "Update check configuration") + timeouts:TimeoutConfig = Field(default_factory = TimeoutConfig, description = "Centralized timeout configuration.") - def with_values(self, values: dict[str, Any]) -> Config: - return Config.model_validate(dicts.apply_defaults(copy.deepcopy(values), defaults=self.model_dump())) + def with_values(self, values:dict[str, Any]) -> Config: + return Config.model_validate(dicts.apply_defaults(copy.deepcopy(values), defaults = self.model_dump())) diff --git a/src/kleinanzeigen_bot/resources/translations.de.yaml b/src/kleinanzeigen_bot/resources/translations.de.yaml index 0f0b443..cd2f48a 100644 --- a/src/kleinanzeigen_bot/resources/translations.de.yaml +++ b/src/kleinanzeigen_bot/resources/translations.de.yaml @@ -457,6 +457,9 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py: " -> Browser profile name: %s": " -> Browser-Profilname: %s" " -> Browser user data dir: %s": " -> Browser-Benutzerdatenverzeichnis: %s" " -> Custom Browser argument: %s": " -> Benutzerdefiniertes Browser-Argument: %s" + "Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir.": "Ignoriere leeres --user-data-dir= Argument; verwende konfiguriertes user_data_dir." + "Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value.": "Konfiguriertes browser.user_data_dir (%s) stimmt nicht mit --user-data-dir Argument (%s) überein; verwende Argument-Wert." + "Remote debugging detected, but browser configuration looks invalid: %s": "Remote-Debugging erkannt, aber Browser-Konfiguration scheint ungültig: %s" " -> Setting chrome prefs [%s]...": " -> Setze Chrome-Einstellungen [%s]..." " -> Adding Browser extension: [%s]": " -> Füge Browser-Erweiterung hinzu: [%s]" "Failed to connect to browser. This error often occurs when:": "Fehler beim Verbinden mit dem Browser. Dieser Fehler tritt häufig auf, wenn:" @@ -546,8 +549,8 @@ kleinanzeigen_bot/utils/web_scraping_mixin.py: " -> Unexpected error during browser version validation, skipping: %s": " -> Unerwarteter Fehler bei Browser-Versionsvalidierung, wird übersprungen: %s" _diagnose_chrome_version_issues: - "(info) %s version from binary: %s %s (major: %d)": "(Info) %s-Version von Binärdatei: %s %s (Hauptversion: %d)" - "(info) %s version from remote debugging: %s %s (major: %d)": "(Info) %s-Version von Remote-Debugging: %s %s (Hauptversion: %d)" + "(info) %s version from binary: %s (major: %d)": "(Info) %s-Version von Binärdatei: %s (Hauptversion: %d)" + "(info) %s version from remote debugging: %s (major: %d)": "(Info) %s-Version von Remote-Debugging: %s (Hauptversion: %d)" "(info) %s 136+ detected - security validation required": "(Info) %s 136+ erkannt - Sicherheitsvalidierung erforderlich" "(info) %s pre-136 detected - no special security requirements": "(Info) %s vor 136 erkannt - keine besonderen Sicherheitsanforderungen" "(info) Remote %s 136+ detected - validating configuration": "(Info) Remote %s 136+ erkannt - validiere Konfiguration" diff --git a/src/kleinanzeigen_bot/update_checker.py b/src/kleinanzeigen_bot/update_checker.py index ad5cebd..7a3fbec 100644 --- a/src/kleinanzeigen_bot/update_checker.py +++ b/src/kleinanzeigen_bot/update_checker.py @@ -31,7 +31,7 @@ colorama.init() class UpdateChecker: """Checks for updates to the bot.""" - def __init__(self, config: "Config", installation_mode: str | xdg_paths.InstallationMode = "portable") -> None: + def __init__(self, config:"Config", installation_mode:str | xdg_paths.InstallationMode = "portable") -> None: """Initialize the update checker. Args: @@ -55,7 +55,7 @@ class UpdateChecker: """Return the effective timeout for HTTP calls.""" return self.config.timeouts.effective("update_check") - def _get_commit_hash(self, version: str) -> str | None: + def _get_commit_hash(self, version:str) -> str | None: """Extract the commit hash from a version string. Args: @@ -68,7 +68,7 @@ class UpdateChecker: return version.split("+")[1] return None - def _resolve_commitish(self, commitish: str) -> tuple[str | None, datetime | None]: + def _resolve_commitish(self, commitish:str) -> tuple[str | None, datetime | None]: """Resolve a commit-ish to a full commit hash and date. Args: @@ -80,7 +80,7 @@ class UpdateChecker: try: response = requests.get( f"https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/commits/{commitish}", - timeout=self._request_timeout(), + timeout = self._request_timeout(), ) response.raise_for_status() data = response.json() @@ -96,7 +96,7 @@ class UpdateChecker: logger.warning(_("Could not resolve commit '%s': %s"), commitish, e) return None, None - def _get_short_commit_hash(self, commit: str) -> str: + def _get_short_commit_hash(self, commit:str) -> str: """Get the short version of a commit hash. Args: @@ -107,7 +107,7 @@ class UpdateChecker: """ return commit[:7] - def _commits_match(self, local_commit: str, release_commit: str) -> bool: + def _commits_match(self, local_commit:str, release_commit:str) -> bool: """Determine whether two commits refer to the same hash. This accounts for short vs. full hashes (e.g. 7 chars vs. 40 chars). @@ -120,7 +120,7 @@ class UpdateChecker: return True return len(release_commit) < len(local_commit) and local_commit.startswith(release_commit) - def check_for_updates(self, *, skip_interval_check: bool = False) -> None: + def check_for_updates(self, *, skip_interval_check:bool = False) -> None: """Check for updates to the bot. Args: @@ -147,7 +147,7 @@ class UpdateChecker: try: if self.config.update_check.channel == "latest": # Use /releases/latest endpoint for stable releases - response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases/latest", timeout=self._request_timeout()) + response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases/latest", timeout = self._request_timeout()) response.raise_for_status() release = response.json() # Defensive: ensure it's not a prerelease @@ -156,7 +156,7 @@ class UpdateChecker: return elif self.config.update_check.channel == "preview": # Use /releases endpoint and select the most recent prerelease - response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases", timeout=self._request_timeout()) + response = requests.get("https://api.github.com/repos/Second-Hand-Friends/kleinanzeigen-bot/releases", timeout = self._request_timeout()) response.raise_for_status() releases = response.json() # Find the most recent prerelease diff --git a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py index 7a591fa..43189c2 100644 --- a/src/kleinanzeigen_bot/utils/web_scraping_mixin.py +++ b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py @@ -4,6 +4,7 @@ import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404 from collections.abc import Awaitable, Callable, Coroutine, Iterable from gettext import gettext as _ +from pathlib import Path from typing import Any, Final, Optional, cast try: @@ -22,7 +23,7 @@ from nodriver.core.tab import Tab as Page from kleinanzeigen_bot.model.config_model import Config as BotConfig from kleinanzeigen_bot.model.config_model import TimeoutConfig -from . import files, loggers, net +from . import files, loggers, net, xdg_paths from .chrome_version_detector import ( ChromeVersionInfo, detect_chrome_version_from_binary, @@ -40,6 +41,28 @@ if TYPE_CHECKING: _KEY_VALUE_PAIR_SIZE = 2 +def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]: + """Resolve the argument and config user_data_dir paths for comparison.""" + try: + return ( + Path(arg_value).expanduser().resolve(), + Path(config_value).expanduser().resolve(), + ) + except OSError as exc: + LOG.debug("Failed to resolve user_data_dir paths for comparison: %s", exc) + return None, None + + +def _has_non_empty_user_data_dir_arg(args:Iterable[str]) -> bool: + for arg in args: + if not arg.startswith("--user-data-dir="): + continue + raw = arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'") + if raw: + return True + return False + + def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]: """Type guard to check if an object is a RemoteObject.""" return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj)) @@ -58,7 +81,7 @@ __all__ = [ LOG:Final[loggers.Logger] = loggers.get_logger(__name__) # see https://api.jquery.com/category/selectors/ -METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'}) +METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in "!\"#$%&'()*+,./:;<=>?@[\\]^`{|}~"}) def _is_admin() -> bool: @@ -90,7 +113,6 @@ class Is(enum.Enum): class BrowserConfig: - def __init__(self) -> None: self.arguments:Iterable[str] = [] self.binary_location:str | None = None @@ -102,37 +124,27 @@ class BrowserConfig: def _write_initial_prefs(prefs_file:str) -> None: with open(prefs_file, "w", encoding = "UTF-8") as fd: - json.dump({ - "credentials_enable_service": False, - "enable_do_not_track": True, - "google": { - "services": { - "consented_to_sync": False - } - }, - "profile": { - "default_content_setting_values": { - "popups": 0, - "notifications": 2 # 1 = allow, 2 = block browser notifications + json.dump( + { + "credentials_enable_service": False, + "enable_do_not_track": True, + "google": {"services": {"consented_to_sync": False}}, + "profile": { + "default_content_setting_values": { + "popups": 0, + "notifications": 2, # 1 = allow, 2 = block browser notifications + }, + "password_manager_enabled": False, }, - "password_manager_enabled": False + "signin": {"allowed": False}, + "translate_site_blacklist": ["www.kleinanzeigen.de"], + "devtools": {"preferences": {"currentDockState": '"bottom"'}}, }, - "signin": { - "allowed": False - }, - "translate_site_blacklist": [ - "www.kleinanzeigen.de" - ], - "devtools": { - "preferences": { - "currentDockState": '"bottom"' - } - } - }, fd) + fd, + ) class WebScrapingMixin: - def __init__(self) -> None: self.browser_config:Final[BrowserConfig] = BrowserConfig() self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue] @@ -140,6 +152,11 @@ class WebScrapingMixin: self._default_timeout_config:TimeoutConfig | None = None self.config:BotConfig = cast(BotConfig, None) + @property + def _installation_mode(self) -> str: + """Get installation mode with fallback to portable.""" + return getattr(self, "installation_mode_or_portable", "portable") + def _get_timeout_config(self) -> TimeoutConfig: config = getattr(self, "config", None) timeouts:TimeoutConfig | None = None @@ -172,12 +189,7 @@ class WebScrapingMixin: return 1 + cfg.retry_max_attempts async def _run_with_timeout_retries( - self, - operation:Callable[[float], Awaitable[T]], - *, - description:str, - key:str = "default", - override:float | None = None + self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None ) -> T: """ Execute an async callable with retry/backoff handling for TimeoutError. @@ -191,13 +203,7 @@ class WebScrapingMixin: except TimeoutError: if attempt >= attempts - 1: raise - LOG.debug( - "Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", - description, - attempt + 1, - attempts, - effective_timeout - ) + LOG.debug("Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout) raise TimeoutError(f"{description} failed without executing operation") @@ -210,8 +216,25 @@ class WebScrapingMixin: self.browser_config.binary_location = self.get_compatible_browser() LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location) + has_remote_debugging = any(arg.startswith("--remote-debugging-port=") for arg in self.browser_config.arguments) + is_test_environment = bool(os.environ.get("PYTEST_CURRENT_TEST")) + + if ( + not (self.browser_config.user_data_dir and self.browser_config.user_data_dir.strip()) + and not _has_non_empty_user_data_dir_arg(self.browser_config.arguments) + and not has_remote_debugging + and not is_test_environment + ): + self.browser_config.user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode)) + # Chrome version detection and validation - await self._validate_chrome_version_configuration() + if has_remote_debugging: + try: + await self._validate_chrome_version_configuration() + except AssertionError as exc: + LOG.warning(_("Remote debugging detected, but browser configuration looks invalid: %s"), exc) + else: + await self._validate_chrome_version_configuration() ######################################################## # check if an existing browser instance shall be used... @@ -229,10 +252,12 @@ class WebScrapingMixin: # Enhanced port checking with retry logic port_available = await self._check_port_with_retry(remote_host, remote_port) - ensure(port_available, + ensure( + port_available, f"Browser process not reachable at {remote_host}:{remote_port}. " f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. " - f"Make sure the browser is running and the port is not blocked by firewall.") + f"Make sure the browser is running and the port is not blocked by firewall.", + ) try: cfg = NodriverConfig( @@ -255,8 +280,7 @@ class WebScrapingMixin: LOG.error("Troubleshooting steps:") LOG.error("1. Close all browser instances and try again") LOG.error("2. Remove the user_data_dir configuration temporarily") - LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", - self.browser_config.binary_location, remote_port) + LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", self.browser_config.binary_location, remote_port) LOG.error("4. Check if any antivirus or security software is blocking the connection") raise @@ -274,13 +298,11 @@ class WebScrapingMixin: "--disable-sync", "--no-experiments", "--disable-search-engine-choice-screen", - "--disable-features=MediaRouter", "--use-mock-keychain", - "--test-type", # https://stackoverflow.com/a/36746675/5116073 # https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping - '--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"' + '--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"', ] is_edge = "edge" in self.browser_config.binary_location.lower() @@ -295,10 +317,36 @@ class WebScrapingMixin: LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name) browser_args.append(f"--profile-directory={self.browser_config.profile_name}") + user_data_dir_from_args:str | None = None for browser_arg in self.browser_config.arguments: LOG.info(" -> Custom Browser argument: %s", browser_arg) + if browser_arg.startswith("--user-data-dir="): + raw = browser_arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'") + if not raw: + LOG.warning(_("Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir.")) + continue + user_data_dir_from_args = raw + continue browser_args.append(browser_arg) + effective_user_data_dir = user_data_dir_from_args or self.browser_config.user_data_dir + if user_data_dir_from_args and self.browser_config.user_data_dir: + arg_path, cfg_path = await asyncio.get_running_loop().run_in_executor( + None, + _resolve_user_data_dir_paths, + user_data_dir_from_args, + self.browser_config.user_data_dir, + ) + if arg_path is None or cfg_path is None or arg_path != cfg_path: + LOG.warning( + _("Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value."), + self.browser_config.user_data_dir, + user_data_dir_from_args, + ) + if not effective_user_data_dir and not is_test_environment: + effective_user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode)) + self.browser_config.user_data_dir = effective_user_data_dir + if not loggers.is_debug(LOG): browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3 @@ -309,7 +357,7 @@ class WebScrapingMixin: headless = False, browser_executable_path = self.browser_config.binary_location, browser_args = browser_args, - user_data_dir = self.browser_config.user_data_dir + user_data_dir = self.browser_config.user_data_dir, ) # already logged by nodriver: @@ -371,8 +419,7 @@ class WebScrapingMixin: return True if attempt < max_retries - 1: - LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", - host, port, retry_delay, attempt + 1, max_retries) + LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", host, port, retry_delay, attempt + 1, max_retries) await asyncio.sleep(retry_delay) return False @@ -522,12 +569,7 @@ class WebScrapingMixin: browser_paths:list[str | None] = [] match platform.system(): case "Linux": - browser_paths = [ - shutil.which("chromium"), - shutil.which("chromium-browser"), - shutil.which("google-chrome"), - shutil.which("microsoft-edge") - ] + browser_paths = [shutil.which("chromium"), shutil.which("chromium-browser"), shutil.which("google-chrome"), shutil.which("microsoft-edge")] case "Darwin": browser_paths = [ @@ -540,18 +582,15 @@ class WebScrapingMixin: browser_paths = [ os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe", os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe", - os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe", os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe", os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe", - os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe", os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe", os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe", - shutil.which("msedge.exe"), shutil.which("chromium.exe"), - shutil.which("chrome.exe") + shutil.which("chrome.exe"), ] case _ as os_name: @@ -563,8 +602,14 @@ class WebScrapingMixin: raise AssertionError(_("Installed browser could not be detected")) - async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *, - timeout:int | float | None = None, timeout_error_message:str = "", apply_multiplier:bool = True) -> T: + async def web_await( + self, + condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], + *, + timeout:int | float | None = None, + timeout_error_message:str = "", + apply_multiplier:bool = True, + ) -> T: """ Blocks/waits until the given condition is met. @@ -604,7 +649,9 @@ class WebScrapingMixin: return elem.attrs.get("disabled") is not None async def is_displayed(elem:Element) -> bool: - return cast(bool, await elem.apply(""" + return cast( + bool, + await elem.apply(""" function (element) { var style = window.getComputedStyle(element); return style.display !== 'none' @@ -613,7 +660,8 @@ class WebScrapingMixin: && element.offsetWidth > 0 && element.offsetHeight > 0 } - """)) + """), + ) elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout) @@ -627,7 +675,9 @@ class WebScrapingMixin: case Is.READONLY: return elem.attrs.get("readonly") is not None case Is.SELECTED: - return cast(bool, await elem.apply(""" + return cast( + bool, + await elem.apply(""" function (element) { if (element.tagName.toLowerCase() === 'input') { if (element.type === 'checkbox' || element.type === 'radio') { @@ -636,7 +686,8 @@ class WebScrapingMixin: } return false } - """)) + """), + ) raise AssertionError(_("Unsupported attribute: %s") % attr) async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element: @@ -743,11 +794,8 @@ class WebScrapingMixin: async def attempt(effective_timeout:float) -> Element: return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent) - return await self._run_with_timeout_retries( - attempt, - description = f"web_find({selector_type.name}, {selector_value})", - key = "default", - override = timeout + return await self._run_with_timeout_retries( # noqa: E501 + attempt, description = f"web_find({selector_type.name}, {selector_value})", key = "default", override = timeout ) async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]: @@ -762,10 +810,7 @@ class WebScrapingMixin: return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent) return await self._run_with_timeout_retries( - attempt, - description = f"web_find_all({selector_type.name}, {selector_value})", - key = "default", - override = timeout + attempt, description = f"web_find_all({selector_type.name}, {selector_value})", key = "default", override = timeout ) async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element: @@ -778,40 +823,46 @@ class WebScrapingMixin: lambda: self.page.query_selector(f"#{escaped_id}", parent), timeout = timeout, timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.CLASS_NAME: escaped_classname = selector_value.translate(METACHAR_ESCAPER) return await self.web_await( lambda: self.page.query_selector(f".{escaped_classname}", parent), timeout = timeout, timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.TAG_NAME: return await self.web_await( lambda: self.page.query_selector(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.CSS_SELECTOR: return await self.web_await( lambda: self.page.query_selector(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.TEXT: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_element_by_text(selector_value, best_match = True), timeout = timeout, timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.XPATH: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_element_by_text(selector_value, best_match = True), timeout = timeout, timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) raise AssertionError(_("Unsupported selector type: %s") % selector_type) @@ -825,33 +876,38 @@ class WebScrapingMixin: lambda: self.page.query_selector_all(f".{escaped_classname}", parent), timeout = timeout, timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.CSS_SELECTOR: return await self.web_await( lambda: self.page.query_selector_all(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.TAG_NAME: return await self.web_await( lambda: self.page.query_selector_all(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.TEXT: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_elements_by_text(selector_value), timeout = timeout, timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) case By.XPATH: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_elements_by_text(selector_value), timeout = timeout, timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}", - apply_multiplier = False) + apply_multiplier = False, + ) raise AssertionError(_("Unsupported selector type: %s") % selector_type) @@ -885,11 +941,12 @@ class WebScrapingMixin: lambda: self.web_execute("document.readyState == 'complete'"), timeout = page_timeout, timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.", - apply_multiplier = False + apply_multiplier = False, ) async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str: - return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply(""" + return str( + await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply(""" function (elem) { let sel = window.getSelection() sel.removeAllRanges() @@ -900,16 +957,19 @@ class WebScrapingMixin: sel.removeAllRanges() return visibleText } - """)) + """) + ) async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None: duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms - LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison - " ... pausing for %d ms ...", duration) + LOG.log( + loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison + " ... pausing for %d ms ...", + duration, + ) await self.page.sleep(duration / 1_000) - async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, - headers:dict[str, str] | None = None) -> Any: + async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, headers:dict[str, str] | None = None) -> Any: method = method.upper() LOG.debug(" -> HTTP %s [%s]...", method, url) response = await self.web_execute(f""" @@ -933,9 +993,10 @@ class WebScrapingMixin: valid_response_codes = [valid_response_codes] ensure( response["statusCode"] in valid_response_codes, - f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}' + f'Invalid response "{response["statusCode"]} {response["statusMessage"]}" received for HTTP {method} to {url}', ) return response + # pylint: enable=dangerous-default-value async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None: @@ -968,8 +1029,9 @@ class WebScrapingMixin: :raises UnexpectedTagNameException: if element is not a