From 6e562164b80d981bf528da68bd1d331b4c8230b7 Mon Sep 17 00:00:00 2001 From: klangborste Date: Sun, 15 Mar 2026 07:55:52 +0100 Subject: [PATCH] fix: Auth0-Login-Migration und GDPR-Banner-Fix (#870) --- src/kleinanzeigen_bot/__init__.py | 530 +++++++++++----- .../resources/translations.de.yaml | 53 +- tests/unit/test_init.py | 591 +++++++++++++----- tests/unit/test_json_pagination.py | 61 ++ 4 files changed, 887 insertions(+), 348 deletions(-) diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py index 1ca0048..6506941 100644 --- a/src/kleinanzeigen_bot/__init__.py +++ b/src/kleinanzeigen_bot/__init__.py @@ -38,7 +38,10 @@ _LOGIN_DETECTION_SELECTORS:Final[list[tuple["By", str]]] = [ (By.CLASS_NAME, "mr-medium"), (By.ID, "user-email"), ] -_LOGIN_DETECTION_SELECTOR_LABELS:Final[tuple[str, ...]] = ("user_info_primary", "user_info_secondary") +_LOGGED_OUT_CTA_SELECTORS:Final[list[tuple["By", str]]] = [ + (By.CSS_SELECTOR, 'a[href*="einloggen"]'), + (By.CSS_SELECTOR, 'a[href*="/m-einloggen"]'), +] colorama.just_fix_windows_console() @@ -997,95 +1000,203 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 await ainput(_("Press a key to continue...")) except TimeoutError: - # No captcha detected within timeout. - pass + page_context = "login page" if is_login_page else "publish flow" + LOG.debug("No captcha detected within timeout on %s", page_context) async def login(self) -> None: + sso_navigation_timeout = self._timeout("page_load") + pre_login_gdpr_timeout = self._timeout("quick_dom") + LOG.info("Checking if already logged in...") await self.web_open(f"{self.root_url}") - if getattr(self, "page", None) is not None: - LOG.debug("Current page URL after opening homepage: %s", self.page.url) + try: + await self._click_gdpr_banner(timeout = pre_login_gdpr_timeout) + except TimeoutError: + LOG.debug("No GDPR banner detected before login") + + state = await self.get_login_state(capture_diagnostics = False) + if state == LoginState.LOGGED_IN: + LOG.info("Already logged in. Skipping login.") + return + + LOG.debug("Navigating to SSO login page (Auth0)...") + # m-einloggen-sso.html triggers immediate server-side redirect to Auth0 + # This avoids waiting for JS on m-einloggen.html which may not execute in headless mode + try: + await self.web_open(f"{self.root_url}/m-einloggen-sso.html", timeout = sso_navigation_timeout) + except TimeoutError: + LOG.warning("Timeout navigating to SSO login page after %.1fs", sso_navigation_timeout) + await self._capture_login_detection_diagnostics_if_enabled() + raise + + self._login_detection_diagnostics_captured = False + + try: + await self.fill_login_data_and_send() + await self.handle_after_login_logic() + except (AssertionError, TimeoutError): + # AssertionError is intentionally part of auth-boundary control flow so + # diagnostics are captured before the original error is re-raised. + await self._capture_login_detection_diagnostics_if_enabled() + raise await self._dismiss_consent_banner() state = await self.get_login_state() if state == LoginState.LOGGED_IN: - LOG.info("Already logged in as [%s]. Skipping login.", self.config.login.username) + LOG.info("Login confirmed.") return - if state == LoginState.UNKNOWN: - LOG.warning("Login state is UNKNOWN - cannot determine if already logged in. Skipping login attempt.") + current_url = self._current_page_url() + LOG.warning("Login state after attempt is %s (url=%s)", state.name, current_url) + await self._capture_login_detection_diagnostics_if_enabled() + raise AssertionError(_("Login could not be confirmed after Auth0 flow (state=%s, url=%s)") % (state.name, current_url)) + + def _current_page_url(self) -> str: + page = getattr(self, "page", None) + if page is None: + return "unknown" + url = getattr(page, "url", None) + if not isinstance(url, str) or not url: + return "unknown" + + parsed = urllib_parse.urlparse(url) + host = parsed.hostname or parsed.netloc.split("@")[-1] + netloc = f"{host}:{parsed.port}" if parsed.port is not None and host else host + sanitized = urllib_parse.urlunparse((parsed.scheme, netloc, parsed.path, "", "", "")) + return sanitized or "unknown" + + async def _wait_for_auth0_login_context(self) -> None: + redirect_timeout = self._timeout("login_detection") + try: + await self.web_await( + lambda: "login.kleinanzeigen.de" in self._current_page_url() or "/u/login" in self._current_page_url(), + timeout = redirect_timeout, + timeout_error_message = f"Auth0 redirect did not start within {redirect_timeout} seconds", + apply_multiplier = False, + ) + except TimeoutError as ex: + current_url = self._current_page_url() + raise AssertionError(_("Auth0 redirect not detected (url=%s)") % current_url) from ex + + async def _wait_for_auth0_password_step(self) -> None: + password_step_timeout = self._timeout("login_detection") + try: + await self.web_await( + lambda: "/u/login/password" in self._current_page_url(), + timeout = password_step_timeout, + timeout_error_message = f"Auth0 password page not reached within {password_step_timeout} seconds", + apply_multiplier = False, + ) + except TimeoutError as ex: + current_url = self._current_page_url() + raise AssertionError(_("Auth0 password step not reached (url=%s)") % current_url) from ex + + async def _wait_for_post_auth0_submit_transition(self) -> None: + post_submit_timeout = self._timeout("login_detection") + quick_dom_timeout = self._timeout("quick_dom") + fallback_max_ms = max(700, int(quick_dom_timeout * 1_000)) + fallback_min_ms = max(300, fallback_max_ms // 2) + + try: + await self.web_await( + lambda: self._is_valid_post_auth0_destination(self._current_page_url()), + timeout = post_submit_timeout, + timeout_error_message = f"Auth0 post-submit transition did not complete within {post_submit_timeout} seconds", + apply_multiplier = False, + ) + return + except TimeoutError: + LOG.debug("Post-submit transition not detected via URL, checking logged-in selectors") + + login_confirmed = False + try: + login_confirmed = await asyncio.wait_for(self.is_logged_in(include_probe = False), timeout = post_submit_timeout) + except (TimeoutError, asyncio.TimeoutError): + LOG.debug("Post-submit login verification did not complete within %.1fs", post_submit_timeout) + + if login_confirmed: return - LOG.info("Opening login page...") - await self.web_open(f"{self.root_url}/m-einloggen.html?targetUrl=/") + LOG.debug("Auth0 post-submit verification remained inconclusive; applying bounded fallback pause") + await self.web_sleep(min_ms = fallback_min_ms, max_ms = fallback_max_ms) - await self.fill_login_data_and_send() - await self.handle_after_login_logic() + try: + if await asyncio.wait_for(self.is_logged_in(include_probe = False), timeout = quick_dom_timeout): + return + except (TimeoutError, asyncio.TimeoutError): + LOG.debug("Final post-submit login confirmation did not complete within %.1fs", quick_dom_timeout) - # Sometimes a second login is required - state = await self.get_login_state() - if state == LoginState.UNKNOWN: - LOG.warning("Login state is UNKNOWN after first login attempt - cannot determine login status. Aborting login process.") - return + current_url = self._current_page_url() + raise TimeoutError(_("Auth0 post-submit verification remained inconclusive (url=%s)") % current_url) - if state == LoginState.LOGGED_OUT: - LOG.debug("First login attempt did not succeed, trying second login attempt") - await self.fill_login_data_and_send() - await self.handle_after_login_logic() + def _is_valid_post_auth0_destination(self, url:str) -> bool: + if not url or url in {"unknown", "about:blank"}: + return False - state = await self.get_login_state() - if state == LoginState.LOGGED_IN: - LOG.debug("Second login attempt succeeded") - else: - LOG.warning("Second login attempt also failed - login may not have succeeded") + parsed = urllib_parse.urlparse(url) + host = (parsed.hostname or "").lower() + path = parsed.path.lower() + + if host != "kleinanzeigen.de" and not host.endswith(".kleinanzeigen.de"): + return False + if host == "login.kleinanzeigen.de": + return False + if path.startswith("/u/login"): + return False + + return "error" not in path async def fill_login_data_and_send(self) -> None: - LOG.info("Logging in as [%s]...", self.config.login.username) - await self.web_input(By.ID, "login-email", self.config.login.username) + """Auth0 2-step login via m-einloggen-sso.html (server-side redirect, no JS needed). - # clearing password input in case browser has stored login data set - await self.web_input(By.ID, "login-password", "") - await self.web_input(By.ID, "login-password", self.config.login.password) + Step 1: /u/login/identifier - email + Step 2: /u/login/password - password + """ + LOG.info("Logging in...") + await self._wait_for_auth0_login_context() + + # Step 1: email identifier + LOG.debug("Auth0 Step 1: entering email...") + await self.web_input(By.ID, "username", self.config.login.username) + await self.web_click(By.CSS_SELECTOR, "button[type='submit']") + + # Step 2: wait for password page then enter password + LOG.debug("Waiting for Auth0 password page...") + await self._wait_for_auth0_password_step() + + LOG.debug("Auth0 Step 2: entering password...") + await self.web_input(By.CSS_SELECTOR, "input[type='password']", self.config.login.password) await self.check_and_wait_for_captcha(is_login_page = True) - - await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']") + await self.web_click(By.CSS_SELECTOR, "button[type='submit']") + await self._wait_for_post_auth0_submit_transition() + LOG.debug("Auth0 login submitted.") async def handle_after_login_logic(self) -> None: try: - sms_timeout = self._timeout("sms_verification") - await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout) - LOG.warning("############################################") - LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") - LOG.warning("############################################") - await ainput(_("Press ENTER when done...")) + await self._check_sms_verification() except TimeoutError: - # No SMS verification prompt detected. - pass + LOG.debug("No SMS verification prompt detected after login") try: - email_timeout = self._timeout("email_verification") - await self.web_find(By.TEXT, "Um dein Konto zu schützen haben wir dir eine E-Mail geschickt", timeout = email_timeout) - LOG.warning("############################################") - LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") - LOG.warning("############################################") - await ainput(_("Press ENTER when done...")) + await self._check_email_verification() except TimeoutError: - # No email verification prompt detected. - pass + LOG.debug("No email verification prompt detected after login") try: - LOG.info("Handling GDPR disclaimer...") - gdpr_timeout = self._timeout("gdpr_prompt") - await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout) - await self.web_click(By.ID, "gdpr-banner-cmp-button") - await self.web_click( - By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout = gdpr_timeout - ) + LOG.debug("Handling GDPR disclaimer...") + await self._click_gdpr_banner() except TimeoutError: - # GDPR banner not shown within timeout. - pass + LOG.debug("GDPR banner not found or timed out") + + async def _check_sms_verification(self) -> None: + sms_timeout = self._timeout("sms_verification") + await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout) + LOG.warning("############################################") + LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") + LOG.warning("############################################") + await ainput(_("Press ENTER when done...")) async def _dismiss_consent_banner(self) -> None: """Dismiss the GDPR/TCF consent banner if it is present. @@ -1100,65 +1211,39 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 LOG.debug("Consent banner detected, clicking 'Alle akzeptieren'...") await self.web_click(By.ID, "gdpr-banner-accept") except TimeoutError: - pass # Banner not present; nothing to dismiss + LOG.debug("Consent banner not present; continuing without dismissal") - async def _auth_probe_login_state(self) -> LoginState: - """Probe an auth-required endpoint to classify login state. + async def _check_email_verification(self) -> None: + email_timeout = self._timeout("email_verification") + await self.web_find(By.TEXT, "Um dein Konto zu schützen haben wir dir eine E-Mail geschickt", timeout = email_timeout) + LOG.warning("############################################") + LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.") + LOG.warning("############################################") + await ainput(_("Press ENTER when done...")) - The probe is non-mutating (GET request). It is used as a fallback method by - get_login_state() when DOM-based checks are inconclusive. - """ + async def _click_gdpr_banner(self, *, timeout:float | None = None) -> None: + gdpr_timeout = self._timeout("quick_dom") if timeout is None else timeout + await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout) + await self.web_click(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout) - url = f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT" - try: - response = await self.web_request(url, valid_response_codes = [200, 401, 403]) - except (TimeoutError, AssertionError): - # AssertionError can occur when web_request() fails to parse the response (e.g., unexpected content type) - # Treat both timeout and assertion failures as UNKNOWN to avoid false assumptions about login state - return LoginState.UNKNOWN - - status_code = response.get("statusCode") - if status_code in {401, 403}: - return LoginState.LOGGED_OUT - - content = response.get("content", "") - if not isinstance(content, str): - return LoginState.UNKNOWN - - try: - payload = json.loads(content) - except json.JSONDecodeError: - lowered = content.lower() - if "m-einloggen" in lowered or "login-email" in lowered or "login-password" in lowered or "login-form" in lowered: - return LoginState.LOGGED_OUT - return LoginState.UNKNOWN - - if isinstance(payload, dict) and "ads" in payload: - return LoginState.LOGGED_IN - - return LoginState.UNKNOWN - - async def get_login_state(self) -> LoginState: - """Determine current login state using layered detection. + async def get_login_state(self, *, capture_diagnostics:bool = True) -> LoginState: + """Determine current login state using DOM - first detection. Order: - 1) DOM-based check via `is_logged_in(include_probe=False)` (preferred - stealthy) - 2) Server-side auth probe via `_auth_probe_login_state` (fallback - more reliable) - 3) If still inconclusive, capture diagnostics via - `_capture_login_detection_diagnostics_if_enabled` and return `UNKNOWN` + 1) DOM - based logged - in check via `is_logged_in(include_probe=False)` + 2) Logged - out CTA check + 3) If inconclusive, optionally capture diagnostics and return `UNKNOWN` """ - # Prefer DOM-based checks first to minimize bot-like behavior. - # The auth probe makes a JSON API request that normal users wouldn't trigger. + # Prefer DOM-based checks first to minimize bot-like behavior and avoid + # fragile API probing side effects. Server-side auth probing was removed. if await self.is_logged_in(include_probe = False): return LoginState.LOGGED_IN - # Fall back to the more reliable server-side auth probe. - # SPA/hydration delays can cause DOM-based checks to temporarily miss login indicators. - state = await self._auth_probe_login_state() - if state != LoginState.UNKNOWN: - return state + if await self._has_logged_out_cta(log_timeout = False): + return LoginState.LOGGED_OUT - await self._capture_login_detection_diagnostics_if_enabled() + if capture_diagnostics: + await self._capture_login_detection_diagnostics_if_enabled() return LoginState.UNKNOWN def _diagnostics_output_dir(self) -> Path: @@ -1271,8 +1356,27 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 login_check_timeout, effective_timeout, ) + quick_dom_timeout = self._timeout("quick_dom") tried_login_selectors = _format_login_detection_selectors(_LOGIN_DETECTION_SELECTORS) + try: + user_info, matched_selector = await self.web_text_first_available( + _LOGIN_DETECTION_SELECTORS, + timeout = quick_dom_timeout, + key = "quick_dom", + description = "login_detection(quick_logged_in)", + ) + if username in user_info.lower(): + matched_selector_display = ( + f"{_LOGIN_DETECTION_SELECTORS[matched_selector][0].name}={_LOGIN_DETECTION_SELECTORS[matched_selector][1]}" + if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTORS) + else f"selector_index_{matched_selector}" + ) + LOG.debug("Login detected via login detection selector '%s'", matched_selector_display) + return True + except TimeoutError: + LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors) + try: user_info, matched_selector = await self.web_text_first_available( _LOGIN_DETECTION_SELECTORS, @@ -1281,32 +1385,60 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 description = "login_detection(selector_group)", ) if username in user_info.lower(): - matched_selector_label = ( - _LOGIN_DETECTION_SELECTOR_LABELS[matched_selector] - if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTOR_LABELS) + matched_selector_display = ( + f"{_LOGIN_DETECTION_SELECTORS[matched_selector][0].name}={_LOGIN_DETECTION_SELECTORS[matched_selector][1]}" + if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTORS) else f"selector_index_{matched_selector}" ) - LOG.debug("Login detected via login detection selector '%s'", matched_selector_label) + LOG.debug("Login detected via login detection selector '%s'", matched_selector_display) return True except TimeoutError: LOG.debug("Timeout waiting for login detection selector group after %.1fs", effective_timeout) - if not include_probe: - LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors) + if await self._has_logged_out_cta(): return False - state = await self._auth_probe_login_state() - if state == LoginState.LOGGED_IN: - return True + if include_probe: + LOG.debug("No login detected via configured login detection selectors (%s); auth probe is disabled", tried_login_selectors) + return False - LOG.debug( - "No login detected - DOM login detection selectors (%s) did not confirm login and server probe returned %s", - tried_login_selectors, - state.name, - ) + LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors) return False - async def _fetch_published_ads(self) -> list[dict[str, Any]]: + async def _has_logged_out_cta(self, *, log_timeout:bool = True) -> bool: + quick_dom_timeout = self._timeout("quick_dom") + tried_logged_out_selectors = _format_login_detection_selectors(_LOGGED_OUT_CTA_SELECTORS) + + try: + cta_element, cta_index = await self.web_find_first_available( + _LOGGED_OUT_CTA_SELECTORS, + timeout = quick_dom_timeout, + key = "quick_dom", + description = "login_detection(logged_out_cta)", + ) + cta_text = await self._extract_visible_text(cta_element) + if cta_text.strip(): + matched_selector_display = ( + f"{_LOGGED_OUT_CTA_SELECTORS[cta_index][0].name}={_LOGGED_OUT_CTA_SELECTORS[cta_index][1]}" + if 0 <= cta_index < len(_LOGGED_OUT_CTA_SELECTORS) + else f"selector_index_{cta_index}" + ) + if 0 <= cta_index < len(_LOGGED_OUT_CTA_SELECTORS): + LOG.debug("Fast logged-out pre-check matched selector '%s'", matched_selector_display) + return True + LOG.debug("Fast logged-out pre-check got unexpected selector index '%s'; failing closed", cta_index) + return False + except TimeoutError: + if log_timeout: + LOG.debug( + "Fast logged-out pre-check found no login CTA (%s) within %.1fs", + tried_logged_out_selectors, + quick_dom_timeout, + ) + + return False + + async def _fetch_published_ads(self, *, strict:bool = False) -> list[dict[str, Any]]: """Fetch all published ads, handling API pagination. Returns: @@ -1326,37 +1458,84 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 try: response = await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT&pageNum={page}") except TimeoutError as ex: - LOG.warning("Pagination request timed out on page %s: %s", page, ex) + if strict: + raise + LOG.warning("Pagination request failed on page %s: %s", page, ex) + break + + if not isinstance(response, dict): + if strict: + raise TypeError(f"Unexpected pagination response type on page {page}: {type(response).__name__}") + LOG.warning("Unexpected pagination response type on page %s: %s", page, type(response).__name__) break content = response.get("content", "") + if isinstance(content, bytearray): + content = bytes(content) + if isinstance(content, bytes): + content = content.decode("utf-8", errors = "replace") + if not isinstance(content, str): + if strict: + raise TypeError(f"Unexpected response content type on page {page}: {type(content).__name__}") + LOG.warning("Unexpected response content type on page %s: %s", page, type(content).__name__) + break + try: json_data = json.loads(content) - except json.JSONDecodeError as ex: + except (json.JSONDecodeError, TypeError) as ex: if not content: + if strict: + raise ValueError(f"Empty JSON response content on page {page}") from ex LOG.warning("Empty JSON response content on page %s", page) break + if strict: + raise ValueError(f"Failed to parse JSON response on page {page}: {ex}") from ex snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "") LOG.warning("Failed to parse JSON response on page %s: %s (content: %s)", page, ex, snippet) break if not isinstance(json_data, dict): + if strict: + raise TypeError(f"Unexpected JSON payload type on page {page}: {type(json_data).__name__}") snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "") LOG.warning("Unexpected JSON payload on page %s (content: %s)", page, snippet) break page_ads = json_data.get("ads", []) if not isinstance(page_ads, list): + if strict: + raise TypeError(f"Unexpected 'ads' type on page {page}: {type(page_ads).__name__}") preview = str(page_ads) if len(preview) > SNIPPET_LIMIT: preview = preview[:SNIPPET_LIMIT] + "..." LOG.warning("Unexpected 'ads' type on page %s: %s value: %s", page, type(page_ads).__name__, preview) break - ads.extend(page_ads) + filtered_page_ads:list[dict[str, Any]] = [] + rejected_count = 0 + rejected_preview:str | None = None + for entry in page_ads: + if isinstance(entry, dict): + filtered_page_ads.append(entry) + continue + rejected_count += 1 + if strict: + raise TypeError(f"Unexpected ad entry type on page {page}: {type(entry).__name__}") + if rejected_preview is None: + rejected_preview = repr(entry) + + if rejected_count > 0: + preview = rejected_preview or "" + if len(preview) > SNIPPET_LIMIT: + preview = preview[:SNIPPET_LIMIT] + "..." + LOG.warning("Filtered %s malformed ad entries on page %s (sample: %s)", rejected_count, page, preview) + + ads.extend(filtered_page_ads) paging = json_data.get("paging") if not isinstance(paging, dict): + if strict: + raise ValueError(f"Missing or invalid paging info on page {page}: {type(paging).__name__}") LOG.debug("No paging dict found on page %s, assuming single page", page) break @@ -1365,10 +1544,14 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 total_pages = misc.coerce_page_number(paging.get("last")) if current_page_num is None: + if strict: + raise ValueError(f"Invalid 'pageNum' in paging info: {paging.get('pageNum')}") LOG.warning("Invalid 'pageNum' in paging info: %s, stopping pagination", paging.get("pageNum")) break if total_pages is None: + if strict: + raise ValueError("No pagination info found") LOG.debug("No pagination info found, assuming single page") break @@ -1387,6 +1570,8 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 # Use API's next field for navigation (more robust than our counter) next_page = misc.coerce_page_number(paging.get("next")) if next_page is None: + if strict: + raise ValueError(f"Invalid 'next' page value in paging info: {paging.get('next')}") LOG.warning("Invalid 'next' page value in paging info: %s, stopping pagination", paging.get("next")) break page = next_page @@ -1554,6 +1739,28 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 # Check for success messages return await self.web_check(By.ID, "checking-done", Is.DISPLAYED) or await self.web_check(By.ID, "not-completed", Is.DISPLAYED) + async def _detect_new_published_ad_ids(self, ads_before_publish:set[str], ad_title:str) -> set[str] | None: + try: + current_ads = await self._fetch_published_ads(strict = True) + current_ad_ids:set[str] = set() + for current_ad in current_ads: + if not isinstance(current_ad, dict): + # Keep duplicate-prevention verification fail-closed: malformed entries + # must abort retries rather than risk creating duplicate listings. + entry_length = len(current_ad) if hasattr(current_ad, "__len__") else None + LOG.debug("Malformed ad entry in strict duplicate verification: type=%s length=%s", type(current_ad).__name__, entry_length) + raise TypeError(f"Unexpected ad entry type: {type(current_ad).__name__}") + if current_ad.get("id"): + current_ad_ids.add(str(current_ad["id"])) + except Exception as ex: # noqa: BLE001 + LOG.warning( + "Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.", + ad_title, + ex, + ) + return None + return current_ad_ids - ads_before_publish + async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: count = 0 failed_count = 0 @@ -1589,34 +1796,33 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 raise # Respect task cancellation except (TimeoutError, ProtocolException) as ex: await self._capture_publish_error_diagnostics_if_enabled(ad_cfg, ad_cfg_orig, ad_file, attempt, ex) - if attempt < max_retries: - # Before retrying, check if the ad was already created despite the error. - # A partially successful submission followed by a retry would create a duplicate listing, - # which violates kleinanzeigen.de terms of service and can lead to account suspension. - try: - current_ads = await self._fetch_published_ads() - current_ad_ids = {str(x["id"]) for x in current_ads if x.get("id")} - new_ad_ids = current_ad_ids - ads_before_publish - if new_ad_ids: - LOG.warning( - "Attempt %s/%s failed for '%s': %s. " - "However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.", - attempt, max_retries, ad_cfg.title, ex, ", ".join(new_ad_ids) - ) - failed_count += 1 - break - except Exception as verify_ex: # noqa: BLE001 - LOG.warning( - "Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.", - ad_cfg.title, verify_ex, - ) - failed_count += 1 - break - LOG.warning("Attempt %s/%s failed for '%s': %s. Retrying...", attempt, max_retries, ad_cfg.title, ex) - await self.web_sleep(2) # Wait before retry - else: + if attempt >= max_retries: LOG.error("All %s attempts failed for '%s': %s. Skipping ad.", max_retries, ad_cfg.title, ex) failed_count += 1 + continue + + # Before retrying, check if the ad was already created despite the error. + # A partially successful submission followed by a retry would create a duplicate listing, + # which violates kleinanzeigen.de terms of service and can lead to account suspension. + new_ad_ids = await self._detect_new_published_ad_ids(ads_before_publish, ad_cfg.title) + if new_ad_ids is None: + failed_count += 1 + break + if new_ad_ids: + LOG.warning( + "Attempt %s/%s failed for '%s': %s. " + "However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.", + attempt, + max_retries, + ad_cfg.title, + ex, + ", ".join(new_ad_ids), + ) + failed_count += 1 + break + + LOG.warning("Attempt %s/%s failed for '%s': %s. Retrying...", attempt, max_retries, ad_cfg.title, ex) + await self.web_sleep(2_000) # Wait before retry # Check publishing result separately (no retry - ad is already submitted) if success: @@ -1640,10 +1846,10 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]], mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE ) -> None: """ - @param ad_cfg: the effective ad config (i.e. with default values applied etc.) - @param ad_cfg_orig: the ad config as present in the YAML file - @param published_ads: json list of published ads - @param mode: the mode of ad editing, either publishing a new or updating an existing ad + @ param ad_cfg: the effective ad config(i.e. with default values applied etc.) + @ param ad_cfg_orig: the ad config as present in the YAML file + @ param published_ads: json list of published ads + @ param mode: the mode of ad editing, either publishing a new or updating an existing ad """ if mode == AdUpdateStrategy.REPLACE: @@ -2256,7 +2462,7 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 async def download_ads(self) -> None: """ Determines which download mode was chosen with the arguments, and calls the specified download routine. - This downloads either all, only unsaved (new), or specific ads given by ID. + This downloads either all, only unsaved(new), or specific ads given by ID. """ # Fetch published ads once from manage-ads JSON to avoid repetitive API calls during extraction # Build lookup dict inline and pass directly to extractor (no cache abstraction needed) @@ -2345,10 +2551,10 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904 def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str: """Get the ad description optionally with prefix and suffix applied. - Precedence (highest to lowest): - 1. Direct ad-level affixes (description_prefix/suffix) - 2. Global flattened affixes (ad_defaults.description_prefix/suffix) - 3. Legacy global nested affixes (ad_defaults.description.prefix/suffix) + Precedence(highest to lowest): + 1. Direct ad - level affixes(description_prefix / suffix) + 2. Global flattened affixes(ad_defaults.description_prefix / suffix) + 3. Legacy global nested affixes(ad_defaults.description.prefix / suffix) Args: ad_cfg: The ad configuration dictionary @@ -2420,8 +2626,8 @@ def main(args:list[str]) -> None: print( textwrap.dedent(rf""" _ _ _ _ _ _ - | | _| | ___(_)_ __ __ _ _ __ _______(_) __ _ ___ _ __ | |__ ___ | |_ - | |/ / |/ _ \ | '_ \ / _` | '_ \|_ / _ \ |/ _` |/ _ \ '_ \ ____| '_ \ / _ \| __| + | | _ | | ___(_)_ __ __ _ _ __ _______(_) __ _ ___ _ __ | |__ ___ | |_ + | | / / | / _ \ | '_ \ / _` | '_ \|_ / _ \ |/ _` |/ _ \ '_ \ ____| '_ \ / _ \| __| | <| | __/ | | | | (_| | | | |/ / __/ | (_| | __/ | | |____| |_) | (_) | |_ |_|\_\_|\___|_|_| |_|\__,_|_| |_/___\___|_|\__, |\___|_| |_| |_.__/ \___/ \__| |___/ diff --git a/src/kleinanzeigen_bot/resources/translations.de.yaml b/src/kleinanzeigen_bot/resources/translations.de.yaml index d38e905..383b4b9 100644 --- a/src/kleinanzeigen_bot/resources/translations.de.yaml +++ b/src/kleinanzeigen_bot/resources/translations.de.yaml @@ -37,9 +37,12 @@ kleinanzeigen_bot/__init__.py: "Empty JSON response content on page %s": "Leerer JSON-Antwortinhalt auf Seite %s" "Failed to parse JSON response on page %s: %s (content: %s)": "Fehler beim Parsen der JSON-Antwort auf Seite %s: %s (Inhalt: %s)" "Stopping pagination after %s pages to avoid infinite loop": "Stoppe die Seitenaufschaltung nach %s Seiten, um eine Endlosschleife zu vermeiden" - "Pagination request timed out on page %s: %s": "Zeitueberschreitung bei der Seitenabfrage auf Seite %s: %s" + "Pagination request failed on page %s: %s": "Seitenabfrage auf Seite %s fehlgeschlagen: %s" + "Unexpected pagination response type on page %s: %s": "Unerwarteter Typ der Paginierungsantwort auf Seite %s: %s" + "Unexpected response content type on page %s: %s": "Unerwarteter Antwortinhalt-Typ auf Seite %s: %s" "Unexpected JSON payload on page %s (content: %s)": "Unerwartete JSON-Antwort auf Seite %s (Inhalt: %s)" "Unexpected 'ads' type on page %s: %s value: %s": "Unerwarteter 'ads'-Typ auf Seite %s: %s Wert: %s" + "Filtered %s malformed ad entries on page %s (sample: %s)": "%s fehlerhafte Anzeigen-Einträge auf Seite %s gefiltert (Beispiel: %s)" "Reached last page %s of %s, stopping pagination": "Letzte Seite %s von %s erreicht, beende Paginierung" "No ads found on page %s, stopping pagination": "Keine Anzeigen auf Seite %s gefunden, beende Paginierung" "Invalid 'next' page value in paging info: %s, stopping pagination": "Ungültiger 'next'-Seitenwert in Paginierungsinfo: %s, beende Paginierung" @@ -86,14 +89,36 @@ kleinanzeigen_bot/__init__.py: login: "Checking if already logged in...": "Überprüfe, ob bereits eingeloggt..." - "Current page URL after opening homepage: %s": "Aktuelle Seiten-URL nach dem Öffnen der Startseite: %s" - "Already logged in as [%s]. Skipping login.": "Bereits eingeloggt als [%s]. Überspringe Anmeldung." - "Opening login page...": "Öffne Anmeldeseite..." - "Login state is UNKNOWN - cannot determine if already logged in. Skipping login attempt.": "Login-Status ist UNKNOWN - kann nicht bestimmt werden, ob bereits eingeloggt ist. Überspringe Anmeldeversuch." - "Login state is UNKNOWN after first login attempt - cannot determine login status. Aborting login process.": "Login-Status ist UNKNOWN nach dem ersten Anmeldeversuch - kann Login-Status nicht bestimmen. Breche Anmeldeprozess ab." - "First login attempt did not succeed, trying second login attempt": "Erster Anmeldeversuch war nicht erfolgreich, versuche zweiten Anmeldeversuch" - "Second login attempt succeeded": "Zweiter Anmeldeversuch erfolgreich" - "Second login attempt also failed - login may not have succeeded": "Zweiter Anmeldeversuch ebenfalls fehlgeschlagen - Anmeldung möglicherweise nicht erfolgreich" + "Already logged in. Skipping login.": "Bereits eingeloggt. Überspringe Anmeldung." + "Navigating to SSO login page (Auth0)...": "Navigiere zur SSO-Anmeldeseite (Auth0)..." + "Timeout navigating to SSO login page after %.1fs": "Zeitüberschreitung beim Navigieren zur SSO-Anmeldeseite nach %.1fs" + "Login confirmed.": "Anmeldung bestätigt." + "Login state after attempt is %s (url=%s)": "Login-Status nach dem Versuch ist %s (URL=%s)" + "Login could not be confirmed after Auth0 flow (state=%s, url=%s)": "Anmeldung nach Auth0-Flow konnte nicht bestätigt werden (Status=%s, URL=%s)" + + _wait_for_auth0_login_context: + "Auth0 redirect not detected (url=%s)": "Auth0-Weiterleitung nicht erkannt (URL=%s)" + + _wait_for_auth0_password_step: + "Auth0 password step not reached (url=%s)": "Auth0-Passwortschritt nicht erreicht (URL=%s)" + + _wait_for_post_auth0_submit_transition: + "Auth0 post-submit verification remained inconclusive (url=%s)": "Auth0-Verifikation nach Absenden blieb unklar (URL=%s)" + + fill_login_data_and_send: + "Logging in...": "Anmeldung..." + "Auth0 Step 1: entering email...": "Auth0 Schritt 1: E-Mail wird eingegeben..." + "Waiting for Auth0 password page...": "Warte auf Auth0-Passwortseite..." + "Auth0 Step 2: entering password...": "Auth0 Schritt 2: Passwort wird eingegeben..." + "Auth0 login submitted.": "Auth0-Anmeldung abgesendet." + + _check_sms_verification: + "# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen." + "Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..." + + _check_email_verification: + "# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen." + "Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..." is_logged_in: "Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)" @@ -101,8 +126,6 @@ kleinanzeigen_bot/__init__.py: "Timeout waiting for login detection selector group after %.1fs": "Timeout beim Warten auf die Login-Erkennungs-Selektorgruppe nach %.1fs" handle_after_login_logic: - "# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen." - "Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..." "Handling GDPR disclaimer...": "Verarbeite DSGVO-Hinweis..." delete_ads: @@ -156,11 +179,14 @@ kleinanzeigen_bot/__init__.py: "Attempt %s/%s failed for '%s': %s. Retrying...": "Versuch %s/%s fehlgeschlagen für '%s': %s. Erneuter Versuch..." "Attempt %s/%s failed for '%s': %s. However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.": "Versuch %s/%s fehlgeschlagen für '%s': %s. Jedoch wurde eine neue Anzeige erkannt (ID: %s) -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden." "Could not fetch fresh published-ads baseline for '%s': %s. Falling back to initial snapshot.": "Konnte keine aktuelle Anzeigen-Baseline für '%s' abrufen: %s. Verwende initialen Snapshot." - "Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.": "Veröffentlichte Anzeigen konnten nach fehlgeschlagenem Versuch für '%s' nicht geprüft werden: %s -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden." "All %s attempts failed for '%s': %s. Skipping ad.": "Alle %s Versuche fehlgeschlagen für '%s': %s. Überspringe Anzeige." "DONE: (Re-)published %s (%s failed after retries)": "FERTIG: %s (erneut) veröffentlicht (%s fehlgeschlagen nach Wiederholungen)" "DONE: (Re-)published %s": "FERTIG: %s (erneut) veröffentlicht" "ad": "Anzeige" + + _detect_new_published_ad_ids: + "Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.": "Veröffentlichte Anzeigen konnten nach fehlgeschlagenem Versuch für '%s' nicht geprüft werden: %s -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden." + apply_auto_price_reduction: "Auto price reduction is enabled for [%s] but no price is configured.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber es wurde kein Preis konfiguriert." "Auto price reduction is enabled for [%s] but min_price equals price (%s) - no reductions will occur.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber min_price entspricht dem Preis (%s) - es werden keine Reduktionen auftreten." @@ -264,9 +290,6 @@ kleinanzeigen_bot/__init__.py: "Unknown command: %s": "Unbekannter Befehl: %s" "Timing collector flush failed: %s": "Zeitmessdaten konnten nicht gespeichert werden: %s" - fill_login_data_and_send: - "Logging in as [%s]...": "Anmeldung als [%s]..." - __set_shipping: "Unable to close shipping dialog!": "Versanddialog konnte nicht geschlossen werden!" diff --git a/tests/unit/test_init.py b/tests/unit/test_init.py index b7546ef..866ade4 100644 --- a/tests/unit/test_init.py +++ b/tests/unit/test_init.py @@ -1,7 +1,7 @@ # SPDX-FileCopyrightText: © Jens Bergmann and contributors # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ -import copy, fnmatch, io, json, logging, os, tempfile # isort: skip +import asyncio, copy, fnmatch, io, json, logging, os, tempfile # isort: skip from collections.abc import Callable, Generator from contextlib import redirect_stdout from datetime import timedelta @@ -442,7 +442,12 @@ class TestKleinanzeigenBotAuthentication: @pytest.mark.asyncio async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None: """Verify that login check returns true when logged in.""" - with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)): + with patch.object( + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + return_value = ("Welcome dummy_user", 0), + ): assert await test_bot.is_logged_in() is True @pytest.mark.asyncio @@ -460,45 +465,96 @@ class TestKleinanzeigenBotAuthentication: async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None: """Verify that login check returns false when not logged in.""" with ( - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), patch.object( test_bot, - "web_request", + "web_text_first_available", new_callable = AsyncMock, - return_value = {"statusCode": 200, "content": "login"}, + side_effect = [("nicht-eingeloggt", 0), ("kein user signal", 0)], ), + patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False), ): assert await test_bot.is_logged_in() is False @pytest.mark.asyncio - async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None: - """Verify login detection uses selector-group lookup with login_detection timeout key.""" - with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as group_text: - assert await test_bot.is_logged_in(include_probe = False) is True - - group_text.assert_awaited_once() - call_args = group_text.await_args - assert call_args is not None - assert call_args.args[0] == [(By.CLASS_NAME, "mr-medium"), (By.ID, "user-email")] - assert call_args.kwargs["key"] == "login_detection" - assert call_args.kwargs["timeout"] == test_bot._timeout("login_detection") + async def test_has_logged_out_cta_requires_visible_candidate(self, test_bot:KleinanzeigenBot) -> None: + matched_element = MagicMock(spec = Element) + with ( + patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)), + patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = ""), + ): + assert await test_bot._has_logged_out_cta() is False @pytest.mark.asyncio - async def test_is_logged_in_logs_selector_label_without_raw_selector_literals( + async def test_has_logged_out_cta_accepts_visible_candidate(self, test_bot:KleinanzeigenBot) -> None: + matched_element = MagicMock(spec = Element) + with ( + patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)), + patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = "Einloggen"), + ): + assert await test_bot._has_logged_out_cta() is True + + @pytest.mark.asyncio + async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None: + """Verify login detection uses selector-group lookup with login_detection timeout key.""" + with patch.object( + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + side_effect = [TimeoutError(), ("Welcome dummy_user", 0)], + ) as group_text: + assert await test_bot.is_logged_in(include_probe = False) is True + + group_text.assert_awaited() + assert any(call.kwargs.get("timeout") == test_bot._timeout("login_detection") for call in group_text.await_args_list) + + @pytest.mark.asyncio + async def test_is_logged_in_runs_full_selector_group_before_cta_precheck(self, test_bot:KleinanzeigenBot) -> None: + """Quick CTA checks must not short-circuit before full logged-in selector checks.""" + with patch.object( + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + side_effect = [TimeoutError(), ("Welcome dummy_user", 0)], + ) as group_text: + assert await test_bot.is_logged_in(include_probe = False) is True + + group_text.assert_awaited() + assert group_text.await_count >= 1 + + @pytest.mark.asyncio + async def test_is_logged_in_short_circuits_before_cta_check_when_quick_user_signal_matches(self, test_bot:KleinanzeigenBot) -> None: + """Logged-in quick pre-check should win even if incidental login links exist elsewhere.""" + with patch.object( + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + return_value = ("angemeldet als: dummy_user", 0), + ) as group_text: + assert await test_bot.is_logged_in(include_probe = False) is True + + group_text.assert_awaited() + assert group_text.await_count >= 1 + + @pytest.mark.asyncio + async def test_is_logged_in_logs_matched_raw_selector( self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture ) -> None: - """Login detection logs should reference stable labels, not raw selector values.""" + """Login detection logs should show the matched raw selector.""" caplog.set_level("DEBUG") with ( caplog.at_level("DEBUG"), - patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("angemeldet als: dummy_user", 1)), + patch.object( + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + return_value = ("angemeldet als: dummy_user", 0), + ), ): assert await test_bot.is_logged_in(include_probe = False) is True - assert "Login detected via login detection selector 'user_info_secondary'" in caplog.text - for forbidden in (".mr-medium", "#user-email", "mr-medium", "user-email"): - assert forbidden not in caplog.text + assert "Login detected via login detection selector" in caplog.text + assert "CLASS_NAME=mr-medium" in caplog.text @pytest.mark.asyncio async def test_is_logged_in_logs_generic_message_when_selector_group_does_not_match( @@ -509,78 +565,87 @@ class TestKleinanzeigenBotAuthentication: with ( caplog.at_level("DEBUG"), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]), + patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False), ): assert await test_bot.is_logged_in(include_probe = False) is False - assert any( - record.message == "No login detected via configured login detection selectors (CLASS_NAME=mr-medium, ID=user-email)" - for record in caplog.records - ) + assert "No login detected via configured login detection selectors" in caplog.text + assert "CLASS_NAME=mr-medium" in caplog.text + assert "ID=user-email" in caplog.text @pytest.mark.asyncio - async def test_is_logged_in_logs_raw_selectors_when_probe_reports_logged_out( + async def test_is_logged_in_logs_raw_selectors_when_dom_checks_fail_and_probe_disabled( self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture ) -> None: - """Probe-based final failure should include the tried raw selectors for debugging.""" + """Final failure should report selectors and disabled-probe state.""" caplog.set_level("DEBUG") with ( caplog.at_level("DEBUG"), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT), + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]), + patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False), ): assert await test_bot.is_logged_in() is False - assert any( - record.message == ( - "No login detected - DOM login detection selectors (CLASS_NAME=mr-medium, ID=user-email) " - "did not confirm login and server probe returned LOGGED_OUT" - ) - for record in caplog.records - ) + assert "No login detected via configured login detection selectors" in caplog.text + assert "auth probe is disabled" in caplog.text @pytest.mark.asyncio - async def test_get_login_state_prefers_dom_over_auth_probe(self, test_bot:KleinanzeigenBot) -> None: + async def test_get_login_state_prefers_dom_checks(self, test_bot:KleinanzeigenBot) -> None: with ( - patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as web_text, patch.object( - test_bot, "_auth_probe_login_state", new_callable = AsyncMock, side_effect = AssertionError("Probe must not run when DOM is deterministic") - ) as probe, + test_bot, + "web_text_first_available", + new_callable = AsyncMock, + return_value = ("Welcome dummy_user", 0), + ) as web_text, ): assert await test_bot.get_login_state() == LoginState.LOGGED_IN web_text.assert_awaited_once() - probe.assert_not_called() + + def test_current_page_url_strips_query_and_fragment(self, test_bot:KleinanzeigenBot) -> None: + page = MagicMock() + page.url = "https://login.kleinanzeigen.de/u/login/password?state=secret&code=abc#frag" + test_bot.page = page + + assert test_bot._current_page_url() == "https://login.kleinanzeigen.de/u/login/password" + + def test_is_valid_post_auth0_destination_filters_invalid_urls(self, test_bot:KleinanzeigenBot) -> None: + assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/") is True + assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/m-meine-anzeigen.html") is True + assert test_bot._is_valid_post_auth0_destination("https://foo.kleinanzeigen.de/") is True + assert test_bot._is_valid_post_auth0_destination("unknown") is False + assert test_bot._is_valid_post_auth0_destination("about:blank") is False + assert test_bot._is_valid_post_auth0_destination("https://evilkleinanzeigen.de/") is False + assert test_bot._is_valid_post_auth0_destination("https://kleinanzeigen.de.evil.com/") is False + assert test_bot._is_valid_post_auth0_destination("https://login.kleinanzeigen.de/u/login/password") is False + assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/login-error-500") is False @pytest.mark.asyncio - async def test_get_login_state_falls_back_to_auth_probe_when_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: + async def test_get_login_state_returns_unknown_when_dom_checks_are_inconclusive(self, test_bot:KleinanzeigenBot) -> None: with ( - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as probe, - ): - assert await test_bot.get_login_state() == LoginState.LOGGED_IN - web_text.assert_awaited_once() - probe.assert_awaited_once() - - @pytest.mark.asyncio - async def test_get_login_state_falls_back_to_auth_probe_when_dom_logged_out(self, test_bot:KleinanzeigenBot) -> None: - with ( - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as probe, - ): - assert await test_bot.get_login_state() == LoginState.LOGGED_OUT - web_text.assert_awaited_once() - probe.assert_awaited_once() - - @pytest.mark.asyncio - async def test_get_login_state_returns_unknown_when_probe_unknown_and_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None: - with ( - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN) as probe, - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text, + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]) as web_text, + patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()) as cta_find, ): assert await test_bot.get_login_state() == LoginState.UNKNOWN - probe.assert_awaited_once() - web_text.assert_awaited_once() + assert web_text.await_count == 2 + assert cta_find.await_count == 2 + + @pytest.mark.asyncio + async def test_get_login_state_returns_logged_out_when_cta_detected(self, test_bot:KleinanzeigenBot) -> None: + matched_element = MagicMock(spec = Element) + with ( + patch.object( + test_bot, + "web_text_first_available", + side_effect = [TimeoutError(), TimeoutError()], + ) as web_text, + patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)), + patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = "Hier einloggen"), + ): + assert await test_bot.get_login_state() == LoginState.LOGGED_OUT + assert web_text.await_count == 2 @pytest.mark.asyncio async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None: @@ -592,8 +657,8 @@ class TestKleinanzeigenBotAuthentication: test_bot.page = page with ( - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]), + patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()), ): assert await test_bot.get_login_state() == LoginState.UNKNOWN @@ -610,8 +675,8 @@ class TestKleinanzeigenBotAuthentication: test_bot.page = page with ( - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]), + patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()), ): assert await test_bot.get_login_state() == LoginState.UNKNOWN @@ -633,8 +698,21 @@ class TestKleinanzeigenBotAuthentication: stdin_mock.isatty.return_value = True with ( - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), + patch.object( + test_bot, + "web_text_first_available", + side_effect = [ + TimeoutError(), + TimeoutError(), + TimeoutError(), + TimeoutError(), + TimeoutError(), + TimeoutError(), + TimeoutError(), + TimeoutError(), + ], + ), + patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()), patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, ): @@ -661,8 +739,8 @@ class TestKleinanzeigenBotAuthentication: stdin_mock.isatty.return_value = False with ( - patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN), - patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError), + patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]), + patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()), patch("kleinanzeigen_bot.sys.stdin", stdin_mock), patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, ): @@ -676,67 +754,71 @@ class TestKleinanzeigenBotAuthentication: with ( patch.object(test_bot, "web_open") as mock_open, patch.object(test_bot, "get_login_state", new_callable = AsyncMock, side_effect = [LoginState.LOGGED_OUT, LoginState.LOGGED_IN]) as mock_logged_in, - patch.object(test_bot, "web_find", side_effect = TimeoutError), - patch.object(test_bot, "web_input") as mock_input, - patch.object(test_bot, "web_click") as mock_click, + patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock), + patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock) as mock_fill, + patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock) as mock_after_login, + patch.object(test_bot, "_dismiss_consent_banner", new_callable = AsyncMock), ): await test_bot.login() - mock_open.assert_called() - mock_logged_in.assert_called() - mock_input.assert_called() - mock_click.assert_called() + opened_urls = [call.args[0] for call in mock_open.call_args_list] + assert any(url.startswith(test_bot.root_url) for url in opened_urls) + assert any(url.endswith("/m-einloggen-sso.html") for url in opened_urls) + mock_logged_in.assert_awaited() + mock_fill.assert_awaited_once() + mock_after_login.assert_awaited_once() @pytest.mark.asyncio - async def test_login_flow_handles_captcha(self, test_bot:KleinanzeigenBot) -> None: - """Verify that login flow handles captcha correctly.""" + async def test_login_flow_returns_early_when_already_logged_in(self, test_bot:KleinanzeigenBot) -> None: + """Login should return early when state is already LOGGED_IN.""" with ( - patch.object(test_bot, "web_open"), - patch.object( - test_bot, - "get_login_state", - new_callable = AsyncMock, - side_effect = [LoginState.LOGGED_OUT, LoginState.LOGGED_OUT, LoginState.LOGGED_IN], - ), - patch.object(test_bot, "web_find") as mock_find, - patch.object(test_bot, "web_input") as mock_input, - patch.object(test_bot, "web_click") as mock_click, - patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, + patch.object(test_bot, "web_open") as mock_open, + patch.object(test_bot, "get_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as mock_state, + patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock), + patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock) as mock_fill, + patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock) as mock_after_login, ): - # Mock the sequence of web_find calls: - # 0. Consent banner not found (in _dismiss_consent_banner, before login state check) - # First login attempt: - # 1. Captcha iframe found (in check_and_wait_for_captcha) - # 2. Phone verification not found (in handle_after_login_logic) - # 3. Email verification not found (in handle_after_login_logic) - # 4. GDPR banner not found (in handle_after_login_logic) - # Second login attempt: - # 5. Captcha iframe found (in check_and_wait_for_captcha) - # 6. Phone verification not found (in handle_after_login_logic) - # 7. Email verification not found (in handle_after_login_logic) - # 8. GDPR banner not found (in handle_after_login_logic) - mock_find.side_effect = [ - TimeoutError(), # Consent banner (before login state check) - AsyncMock(), # Captcha iframe (first login) - TimeoutError(), # Phone verification (first login) - TimeoutError(), # Email verification (first login) - TimeoutError(), # GDPR banner (first login) - AsyncMock(), # Captcha iframe (second login) - TimeoutError(), # Phone verification (second login) - TimeoutError(), # Email verification (second login) - TimeoutError(), # GDPR banner (second login) - ] - mock_ainput.return_value = "" - mock_input.return_value = AsyncMock() - mock_click.return_value = AsyncMock() - await test_bot.login() - # Verify the complete flow - assert mock_find.call_count == 9 # 1 consent banner + 8 original web_find calls - assert mock_ainput.call_count == 2 # Two captcha prompts - assert mock_input.call_count == 6 # Two login attempts with username, clear password, and set password - assert mock_click.call_count == 2 # Two submit button clicks + mock_open.assert_awaited_once() + assert mock_open.await_args is not None + assert mock_open.await_args.args[0] == test_bot.root_url + mock_state.assert_awaited_once() + mock_fill.assert_not_called() + mock_after_login.assert_not_called() + + @pytest.mark.asyncio + async def test_login_flow_raises_when_state_remains_unknown(self, test_bot:KleinanzeigenBot) -> None: + """Post-login UNKNOWN state should fail fast with diagnostics.""" + with ( + patch.object(test_bot, "web_open"), + patch.object(test_bot, "get_login_state", new_callable = AsyncMock, side_effect = [LoginState.LOGGED_OUT, LoginState.UNKNOWN]) as mock_state, + patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock), + patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock), + patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock), + patch.object(test_bot, "_dismiss_consent_banner", new_callable = AsyncMock), + patch.object(test_bot, "_capture_login_detection_diagnostics_if_enabled", new_callable = AsyncMock) as mock_diagnostics, + ): + with pytest.raises(AssertionError, match = "Login could not be confirmed"): + await test_bot.login() + + mock_diagnostics.assert_awaited_once() + mock_state.assert_awaited() + + @pytest.mark.asyncio + async def test_login_flow_raises_when_sso_navigation_times_out(self, test_bot:KleinanzeigenBot) -> None: + """SSO navigation timeout should trigger diagnostics and re-raise.""" + with ( + patch.object(test_bot, "web_open", new_callable = AsyncMock, side_effect = [None, TimeoutError("sso timeout")]), + patch.object(test_bot, "get_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as mock_state, + patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock), + patch.object(test_bot, "_capture_login_detection_diagnostics_if_enabled", new_callable = AsyncMock) as mock_diagnostics, + ): + with pytest.raises(TimeoutError, match = "sso timeout"): + await test_bot.login() + + mock_diagnostics.assert_awaited_once() + mock_state.assert_awaited_once() @pytest.mark.asyncio async def test_check_and_wait_for_captcha(self, test_bot:KleinanzeigenBot) -> None: @@ -764,62 +846,142 @@ class TestKleinanzeigenBotAuthentication: async def test_fill_login_data_and_send(self, test_bot:KleinanzeigenBot) -> None: """Verify that login form filling works correctly.""" with ( + patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock) as wait_context, + patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock) as wait_password, + patch.object(test_bot, "_wait_for_post_auth0_submit_transition", new_callable = AsyncMock) as wait_transition, patch.object(test_bot, "web_input") as mock_input, patch.object(test_bot, "web_click") as mock_click, patch.object(test_bot, "check_and_wait_for_captcha", new_callable = AsyncMock) as mock_captcha, ): - # Mock successful login form interaction - mock_input.return_value = AsyncMock() - mock_click.return_value = AsyncMock() - await test_bot.fill_login_data_and_send() + wait_context.assert_awaited_once() + wait_password.assert_awaited_once() + wait_transition.assert_awaited_once() assert mock_captcha.call_count == 1 - assert mock_input.call_count == 3 # Username, clear password, set password - assert mock_click.call_count == 1 # Submit button + assert mock_input.call_count == 2 + assert mock_click.call_count == 2 + + @pytest.mark.asyncio + async def test_fill_login_data_and_send_logs_generic_start_message( + self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture + ) -> None: + with ( + caplog.at_level("INFO"), + patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock), + patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock), + patch.object(test_bot, "_wait_for_post_auth0_submit_transition", new_callable = AsyncMock), + patch.object(test_bot, "web_input"), + patch.object(test_bot, "web_click"), + patch.object(test_bot, "check_and_wait_for_captcha", new_callable = AsyncMock), + ): + await test_bot.fill_login_data_and_send() + + assert "Logging in..." in caplog.text + assert test_bot.config.login.username not in caplog.text + + @pytest.mark.asyncio + async def test_fill_login_data_and_send_fails_when_password_step_missing(self, test_bot:KleinanzeigenBot) -> None: + """Missing Auth0 password step should fail fast.""" + with ( + patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock), + patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock, side_effect = AssertionError("missing password")), + patch.object(test_bot, "web_input") as mock_input, + patch.object(test_bot, "web_click") as mock_click, + ): + with pytest.raises(AssertionError, match = "missing password"): + await test_bot.fill_login_data_and_send() + + assert mock_input.call_count == 1 + assert mock_click.call_count == 1 + + @pytest.mark.asyncio + async def test_wait_for_post_auth0_submit_transition_url_branch(self, test_bot:KleinanzeigenBot) -> None: + """URL transition success should return without fallback checks.""" + with ( + patch.object(test_bot, "web_await", new_callable = AsyncMock, return_value = True) as mock_wait, + patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep, + ): + await test_bot._wait_for_post_auth0_submit_transition() + + mock_wait.assert_awaited_once() + mock_sleep.assert_not_called() + + @pytest.mark.asyncio + async def test_wait_for_post_auth0_submit_transition_dom_fallback_branch(self, test_bot:KleinanzeigenBot) -> None: + """DOM fallback should run when URL transition is inconclusive.""" + with ( + patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait, + patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, return_value = True) as mock_is_logged_in, + patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep, + ): + await test_bot._wait_for_post_auth0_submit_transition() + + mock_wait.assert_awaited_once() + mock_is_logged_in.assert_awaited_once() + mock_sleep.assert_not_called() + + @pytest.mark.asyncio + async def test_wait_for_post_auth0_submit_transition_sleep_fallback_branch(self, test_bot:KleinanzeigenBot) -> None: + """Sleep fallback should run when bounded login check times out.""" + with ( + patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait, + patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, side_effect = asyncio.TimeoutError) as mock_is_logged_in, + patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep, + ): + with pytest.raises(TimeoutError, match = "Auth0 post-submit verification remained inconclusive"): + await test_bot._wait_for_post_auth0_submit_transition() + + mock_wait.assert_awaited_once() + assert mock_is_logged_in.await_count == 2 + mock_sleep.assert_awaited_once() + assert mock_sleep.await_args is not None + sleep_kwargs = cast(Any, mock_sleep.await_args).kwargs + assert sleep_kwargs["min_ms"] < sleep_kwargs["max_ms"] + + @pytest.mark.asyncio + async def test_wait_for_post_auth0_submit_transition_sleep_fallback_when_login_not_confirmed( + self, test_bot:KleinanzeigenBot + ) -> None: + """Sleep fallback should run when bounded login check returns False.""" + with ( + patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait, + patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, return_value = False) as mock_is_logged_in, + patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep, + ): + with pytest.raises(TimeoutError, match = "Auth0 post-submit verification remained inconclusive"): + await test_bot._wait_for_post_auth0_submit_transition() + + mock_wait.assert_awaited_once() + assert mock_is_logged_in.await_count == 2 + mock_sleep.assert_awaited_once() + + @pytest.mark.asyncio + async def test_click_gdpr_banner_uses_quick_dom_timeout_and_passes_click_timeout(self, test_bot:KleinanzeigenBot) -> None: + with ( + patch.object(test_bot, "_timeout", return_value = 1.25) as mock_timeout, + patch.object(test_bot, "web_find", new_callable = AsyncMock) as mock_find, + patch.object(test_bot, "web_click", new_callable = AsyncMock) as mock_click, + ): + await test_bot._click_gdpr_banner() + + mock_timeout.assert_called_once_with("quick_dom") + mock_find.assert_awaited_once_with(By.ID, "gdpr-banner-accept", timeout = 1.25) + mock_click.assert_awaited_once_with(By.ID, "gdpr-banner-accept", timeout = 1.25) @pytest.mark.asyncio async def test_handle_after_login_logic(self, test_bot:KleinanzeigenBot) -> None: """Verify that post-login handling works correctly.""" with ( - patch.object(test_bot, "web_find") as mock_find, - patch.object(test_bot, "web_click") as mock_click, - patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput, + patch.object(test_bot, "_check_sms_verification", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_sms, + patch.object(test_bot, "_check_email_verification", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_email, + patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_gdpr, ): - # Test case 1: No special handling needed - mock_find.side_effect = [TimeoutError(), TimeoutError(), TimeoutError()] # No phone verification, no email verification, no GDPR - mock_click.return_value = AsyncMock() - mock_ainput.return_value = "" - await test_bot.handle_after_login_logic() - assert mock_find.call_count == 3 - assert mock_click.call_count == 0 - assert mock_ainput.call_count == 0 - - # Test case 2: Phone verification needed - mock_find.reset_mock() - mock_click.reset_mock() - mock_ainput.reset_mock() - mock_find.side_effect = [AsyncMock(), TimeoutError(), TimeoutError()] # Phone verification found, no email verification, no GDPR - - await test_bot.handle_after_login_logic() - - assert mock_find.call_count == 3 - assert mock_click.call_count == 0 # No click needed, just wait for user - assert mock_ainput.call_count == 1 # Wait for user to complete verification - - # Test case 3: GDPR banner present - mock_find.reset_mock() - mock_click.reset_mock() - mock_ainput.reset_mock() - mock_find.side_effect = [TimeoutError(), TimeoutError(), AsyncMock()] # No phone verification, no email verification, GDPR found - - await test_bot.handle_after_login_logic() - - assert mock_find.call_count == 3 - assert mock_click.call_count == 2 # Click to accept GDPR and continue - assert mock_ainput.call_count == 0 + mock_sms.assert_awaited_once() + mock_email.assert_awaited_once() + mock_gdpr.assert_awaited_once() class TestKleinanzeigenBotDiagnostics: @@ -866,9 +1028,10 @@ class TestKleinanzeigenBotDiagnostics: ad_cfg = Ad.model_validate(diagnostics_ad_config) ad_cfg_orig = copy.deepcopy(diagnostics_ad_config) ad_file = str(tmp_path / "ad_000001_Test.yml") + ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})} with ( - patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = {"content": json.dumps({"ads": []})}), + patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response), patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")), ): await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)]) @@ -907,9 +1070,10 @@ class TestKleinanzeigenBotDiagnostics: ad_cfg = Ad.model_validate(diagnostics_ad_config) ad_cfg_orig = copy.deepcopy(diagnostics_ad_config) ad_file = str(tmp_path / "ad_000001_Test.yml") + ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})} with ( - patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = {"content": json.dumps({"ads": []})}), + patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response), patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")), ): await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)]) @@ -1015,6 +1179,35 @@ class TestKleinanzeigenBotBasics: web_await_mock.assert_awaited_once() delete_ad_mock.assert_awaited_once_with(ad_cfgs[0][1], [], delete_old_ads_by_title = False) + @pytest.mark.asyncio + async def test_publish_ads_uses_millisecond_retry_delay_on_retryable_failure( + self, + test_bot:KleinanzeigenBot, + base_ad_config:dict[str, Any], + mock_page:MagicMock, + ) -> None: + """Retry branch should sleep with explicit millisecond delay.""" + test_bot.page = mock_page + test_bot.keep_old_ads = True + + ad_cfg = Ad.model_validate(base_ad_config) + ad_cfg_orig = copy.deepcopy(base_ad_config) + ad_file = "ad.yaml" + ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})} + + with ( + patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response), + patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = [TimeoutError("transient"), None]) as publish_mock, + patch.object(test_bot, "_detect_new_published_ad_ids", new_callable = AsyncMock, return_value = set()) as detect_mock, + patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as sleep_mock, + patch.object(test_bot, "web_await", new_callable = AsyncMock, return_value = True), + ): + await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)]) + + assert publish_mock.await_count == 2 + detect_mock.assert_awaited_once() + sleep_mock.assert_awaited_once_with(2_000) + @pytest.mark.asyncio async def test_publish_ads_aborts_retry_on_duplicate_detection( self, @@ -1047,6 +1240,62 @@ class TestKleinanzeigenBotBasics: # publish_ad should have been called only once — retry was aborted due to duplicate detection assert publish_mock.await_count == 1 + @pytest.mark.asyncio + async def test_publish_ads_aborts_retry_when_duplicate_verification_fetch_is_malformed( + self, + test_bot:KleinanzeigenBot, + base_ad_config:dict[str, Any], + mock_page:MagicMock, + ) -> None: + """Retry verification must fail closed on malformed published-ads responses.""" + test_bot.page = mock_page + + ad_cfg = Ad.model_validate(base_ad_config) + ad_cfg_orig = copy.deepcopy(base_ad_config) + ad_file = "ad.yaml" + + fetch_responses = [ + {"content": json.dumps({"ads": []})}, + {"content": json.dumps({"ads": []})}, + [], + ] + + with ( + patch.object(test_bot, "web_request", new_callable = AsyncMock, side_effect = fetch_responses), + patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("image upload timeout")) as publish_mock, + ): + await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)]) + + assert publish_mock.await_count == 1 + + @pytest.mark.asyncio + async def test_publish_ads_aborts_retry_when_duplicate_verification_ads_entries_are_malformed( + self, + test_bot:KleinanzeigenBot, + base_ad_config:dict[str, Any], + mock_page:MagicMock, + ) -> None: + """Retry verification must fail closed when strict fetch returns non-dict ad entries.""" + test_bot.page = mock_page + + ad_cfg = Ad.model_validate(base_ad_config) + ad_cfg_orig = copy.deepcopy(base_ad_config) + ad_file = "ad.yaml" + + fetch_responses = [ + {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})}, + {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})}, + {"content": json.dumps({"ads": [42], "paging": {"pageNum": 1, "last": 1}})}, + ] + + with ( + patch.object(test_bot, "web_request", new_callable = AsyncMock, side_effect = fetch_responses), + patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("image upload timeout")) as publish_mock, + ): + await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)]) + + assert publish_mock.await_count == 1 + def test_get_root_url(self, test_bot:KleinanzeigenBot) -> None: """Test root URL retrieval.""" assert test_bot.root_url == "https://www.kleinanzeigen.de" diff --git a/tests/unit/test_json_pagination.py b/tests/unit/test_json_pagination.py index bfe967f..047d6c7 100644 --- a/tests/unit/test_json_pagination.py +++ b/tests/unit/test_json_pagination.py @@ -187,6 +187,17 @@ class TestJSONPagination: pytest.fail(f"expected 2 ads, got {len(result)}") mock_request.assert_awaited_once() + @pytest.mark.asyncio + async def test_fetch_published_ads_strict_raises_on_missing_paging_dict(self, bot:KleinanzeigenBot) -> None: + """Strict mode should fail closed when paging metadata is missing.""" + response_data = {"ads": [{"id": 1}, {"id": 2}]} + + with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request: + mock_request.return_value = {"content": json.dumps(response_data)} + + with pytest.raises(ValueError, match = "Missing or invalid paging info on page 1: NoneType"): + await bot._fetch_published_ads(strict = True) + @pytest.mark.asyncio async def test_fetch_published_ads_non_integer_paging_values(self, bot:KleinanzeigenBot) -> None: """Test handling of non-integer paging values.""" @@ -219,6 +230,33 @@ class TestJSONPagination: if len(result) != 0: pytest.fail(f"expected empty list when 'ads' is not a list, got: {result}") + @pytest.mark.asyncio + async def test_fetch_published_ads_strict_rejects_non_dict_entries(self, bot:KleinanzeigenBot) -> None: + """Strict mode should reject malformed entries inside ads list.""" + response_data = {"ads": [42, {"id": 1}], "paging": {"pageNum": 1, "last": 1}} + + with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request: + mock_request.return_value = {"content": json.dumps(response_data)} + + with pytest.raises(TypeError, match = "Unexpected ad entry type on page 1: int"): + await bot._fetch_published_ads(strict = True) + + @pytest.mark.asyncio + async def test_fetch_published_ads_non_strict_filters_non_dict_entries(self, bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture) -> None: + """Non-strict mode should filter malformed entries and continue.""" + response_data = {"ads": [42, {"id": 1}, "broken"], "paging": {"pageNum": 1, "last": 1}} + + with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request: + mock_request.return_value = {"content": json.dumps(response_data)} + + with caplog.at_level("WARNING"): + result = await bot._fetch_published_ads(strict = False) + + if result != [{"id": 1}]: + pytest.fail(f"expected malformed entries to be filtered out, got: {result}") + if "Filtered 2 malformed ad entries on page 1" not in caplog.text: + pytest.fail(f"expected malformed-entry warning in logs, got: {caplog.text}") + @pytest.mark.asyncio async def test_fetch_published_ads_timeout(self, bot:KleinanzeigenBot) -> None: """Test handling of timeout during pagination.""" @@ -229,3 +267,26 @@ class TestJSONPagination: if result != []: pytest.fail(f"Expected empty list on timeout, got {result}") + + @pytest.mark.asyncio + async def test_fetch_published_ads_non_strict_handles_non_string_content_type(self, bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture) -> None: + """Non-strict mode should gracefully stop on unexpected non-string content types.""" + with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request: + mock_request.return_value = {"content": None} + + with caplog.at_level("WARNING"): + result = await bot._fetch_published_ads(strict = False) + + if result != []: + pytest.fail(f"expected empty result on non-string content in non-strict mode, got: {result}") + if "Unexpected response content type on page 1: NoneType" not in caplog.text: + pytest.fail(f"expected non-string content warning in logs, got: {caplog.text}") + + @pytest.mark.asyncio + async def test_fetch_published_ads_strict_raises_on_non_string_content_type(self, bot:KleinanzeigenBot) -> None: + """Strict mode should fail closed on unexpected non-string content types.""" + with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request: + mock_request.return_value = {"content": None} + + with pytest.raises(TypeError, match = "Unexpected response content type on page 1: NoneType"): + await bot._fetch_published_ads(strict = True)