mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-16 12:21:50 +01:00
fix: Auth0-Login-Migration und GDPR-Banner-Fix (#870)
This commit is contained in:
@@ -38,7 +38,10 @@ _LOGIN_DETECTION_SELECTORS:Final[list[tuple["By", str]]] = [
|
|||||||
(By.CLASS_NAME, "mr-medium"),
|
(By.CLASS_NAME, "mr-medium"),
|
||||||
(By.ID, "user-email"),
|
(By.ID, "user-email"),
|
||||||
]
|
]
|
||||||
_LOGIN_DETECTION_SELECTOR_LABELS:Final[tuple[str, ...]] = ("user_info_primary", "user_info_secondary")
|
_LOGGED_OUT_CTA_SELECTORS:Final[list[tuple["By", str]]] = [
|
||||||
|
(By.CSS_SELECTOR, 'a[href*="einloggen"]'),
|
||||||
|
(By.CSS_SELECTOR, 'a[href*="/m-einloggen"]'),
|
||||||
|
]
|
||||||
|
|
||||||
colorama.just_fix_windows_console()
|
colorama.just_fix_windows_console()
|
||||||
|
|
||||||
@@ -997,95 +1000,203 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
|
|
||||||
await ainput(_("Press a key to continue..."))
|
await ainput(_("Press a key to continue..."))
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
# No captcha detected within timeout.
|
page_context = "login page" if is_login_page else "publish flow"
|
||||||
pass
|
LOG.debug("No captcha detected within timeout on %s", page_context)
|
||||||
|
|
||||||
async def login(self) -> None:
|
async def login(self) -> None:
|
||||||
|
sso_navigation_timeout = self._timeout("page_load")
|
||||||
|
pre_login_gdpr_timeout = self._timeout("quick_dom")
|
||||||
|
|
||||||
LOG.info("Checking if already logged in...")
|
LOG.info("Checking if already logged in...")
|
||||||
await self.web_open(f"{self.root_url}")
|
await self.web_open(f"{self.root_url}")
|
||||||
if getattr(self, "page", None) is not None:
|
try:
|
||||||
LOG.debug("Current page URL after opening homepage: %s", self.page.url)
|
await self._click_gdpr_banner(timeout = pre_login_gdpr_timeout)
|
||||||
|
except TimeoutError:
|
||||||
|
LOG.debug("No GDPR banner detected before login")
|
||||||
|
|
||||||
|
state = await self.get_login_state(capture_diagnostics = False)
|
||||||
|
if state == LoginState.LOGGED_IN:
|
||||||
|
LOG.info("Already logged in. Skipping login.")
|
||||||
|
return
|
||||||
|
|
||||||
|
LOG.debug("Navigating to SSO login page (Auth0)...")
|
||||||
|
# m-einloggen-sso.html triggers immediate server-side redirect to Auth0
|
||||||
|
# This avoids waiting for JS on m-einloggen.html which may not execute in headless mode
|
||||||
|
try:
|
||||||
|
await self.web_open(f"{self.root_url}/m-einloggen-sso.html", timeout = sso_navigation_timeout)
|
||||||
|
except TimeoutError:
|
||||||
|
LOG.warning("Timeout navigating to SSO login page after %.1fs", sso_navigation_timeout)
|
||||||
|
await self._capture_login_detection_diagnostics_if_enabled()
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._login_detection_diagnostics_captured = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.fill_login_data_and_send()
|
||||||
|
await self.handle_after_login_logic()
|
||||||
|
except (AssertionError, TimeoutError):
|
||||||
|
# AssertionError is intentionally part of auth-boundary control flow so
|
||||||
|
# diagnostics are captured before the original error is re-raised.
|
||||||
|
await self._capture_login_detection_diagnostics_if_enabled()
|
||||||
|
raise
|
||||||
|
|
||||||
await self._dismiss_consent_banner()
|
await self._dismiss_consent_banner()
|
||||||
|
|
||||||
state = await self.get_login_state()
|
state = await self.get_login_state()
|
||||||
if state == LoginState.LOGGED_IN:
|
if state == LoginState.LOGGED_IN:
|
||||||
LOG.info("Already logged in as [%s]. Skipping login.", self.config.login.username)
|
LOG.info("Login confirmed.")
|
||||||
return
|
return
|
||||||
|
|
||||||
if state == LoginState.UNKNOWN:
|
current_url = self._current_page_url()
|
||||||
LOG.warning("Login state is UNKNOWN - cannot determine if already logged in. Skipping login attempt.")
|
LOG.warning("Login state after attempt is %s (url=%s)", state.name, current_url)
|
||||||
|
await self._capture_login_detection_diagnostics_if_enabled()
|
||||||
|
raise AssertionError(_("Login could not be confirmed after Auth0 flow (state=%s, url=%s)") % (state.name, current_url))
|
||||||
|
|
||||||
|
def _current_page_url(self) -> str:
|
||||||
|
page = getattr(self, "page", None)
|
||||||
|
if page is None:
|
||||||
|
return "unknown"
|
||||||
|
url = getattr(page, "url", None)
|
||||||
|
if not isinstance(url, str) or not url:
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
parsed = urllib_parse.urlparse(url)
|
||||||
|
host = parsed.hostname or parsed.netloc.split("@")[-1]
|
||||||
|
netloc = f"{host}:{parsed.port}" if parsed.port is not None and host else host
|
||||||
|
sanitized = urllib_parse.urlunparse((parsed.scheme, netloc, parsed.path, "", "", ""))
|
||||||
|
return sanitized or "unknown"
|
||||||
|
|
||||||
|
async def _wait_for_auth0_login_context(self) -> None:
|
||||||
|
redirect_timeout = self._timeout("login_detection")
|
||||||
|
try:
|
||||||
|
await self.web_await(
|
||||||
|
lambda: "login.kleinanzeigen.de" in self._current_page_url() or "/u/login" in self._current_page_url(),
|
||||||
|
timeout = redirect_timeout,
|
||||||
|
timeout_error_message = f"Auth0 redirect did not start within {redirect_timeout} seconds",
|
||||||
|
apply_multiplier = False,
|
||||||
|
)
|
||||||
|
except TimeoutError as ex:
|
||||||
|
current_url = self._current_page_url()
|
||||||
|
raise AssertionError(_("Auth0 redirect not detected (url=%s)") % current_url) from ex
|
||||||
|
|
||||||
|
async def _wait_for_auth0_password_step(self) -> None:
|
||||||
|
password_step_timeout = self._timeout("login_detection")
|
||||||
|
try:
|
||||||
|
await self.web_await(
|
||||||
|
lambda: "/u/login/password" in self._current_page_url(),
|
||||||
|
timeout = password_step_timeout,
|
||||||
|
timeout_error_message = f"Auth0 password page not reached within {password_step_timeout} seconds",
|
||||||
|
apply_multiplier = False,
|
||||||
|
)
|
||||||
|
except TimeoutError as ex:
|
||||||
|
current_url = self._current_page_url()
|
||||||
|
raise AssertionError(_("Auth0 password step not reached (url=%s)") % current_url) from ex
|
||||||
|
|
||||||
|
async def _wait_for_post_auth0_submit_transition(self) -> None:
|
||||||
|
post_submit_timeout = self._timeout("login_detection")
|
||||||
|
quick_dom_timeout = self._timeout("quick_dom")
|
||||||
|
fallback_max_ms = max(700, int(quick_dom_timeout * 1_000))
|
||||||
|
fallback_min_ms = max(300, fallback_max_ms // 2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self.web_await(
|
||||||
|
lambda: self._is_valid_post_auth0_destination(self._current_page_url()),
|
||||||
|
timeout = post_submit_timeout,
|
||||||
|
timeout_error_message = f"Auth0 post-submit transition did not complete within {post_submit_timeout} seconds",
|
||||||
|
apply_multiplier = False,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
except TimeoutError:
|
||||||
|
LOG.debug("Post-submit transition not detected via URL, checking logged-in selectors")
|
||||||
|
|
||||||
|
login_confirmed = False
|
||||||
|
try:
|
||||||
|
login_confirmed = await asyncio.wait_for(self.is_logged_in(include_probe = False), timeout = post_submit_timeout)
|
||||||
|
except (TimeoutError, asyncio.TimeoutError):
|
||||||
|
LOG.debug("Post-submit login verification did not complete within %.1fs", post_submit_timeout)
|
||||||
|
|
||||||
|
if login_confirmed:
|
||||||
return
|
return
|
||||||
|
|
||||||
LOG.info("Opening login page...")
|
LOG.debug("Auth0 post-submit verification remained inconclusive; applying bounded fallback pause")
|
||||||
await self.web_open(f"{self.root_url}/m-einloggen.html?targetUrl=/")
|
await self.web_sleep(min_ms = fallback_min_ms, max_ms = fallback_max_ms)
|
||||||
|
|
||||||
await self.fill_login_data_and_send()
|
try:
|
||||||
await self.handle_after_login_logic()
|
if await asyncio.wait_for(self.is_logged_in(include_probe = False), timeout = quick_dom_timeout):
|
||||||
|
return
|
||||||
|
except (TimeoutError, asyncio.TimeoutError):
|
||||||
|
LOG.debug("Final post-submit login confirmation did not complete within %.1fs", quick_dom_timeout)
|
||||||
|
|
||||||
# Sometimes a second login is required
|
current_url = self._current_page_url()
|
||||||
state = await self.get_login_state()
|
raise TimeoutError(_("Auth0 post-submit verification remained inconclusive (url=%s)") % current_url)
|
||||||
if state == LoginState.UNKNOWN:
|
|
||||||
LOG.warning("Login state is UNKNOWN after first login attempt - cannot determine login status. Aborting login process.")
|
|
||||||
return
|
|
||||||
|
|
||||||
if state == LoginState.LOGGED_OUT:
|
def _is_valid_post_auth0_destination(self, url:str) -> bool:
|
||||||
LOG.debug("First login attempt did not succeed, trying second login attempt")
|
if not url or url in {"unknown", "about:blank"}:
|
||||||
await self.fill_login_data_and_send()
|
return False
|
||||||
await self.handle_after_login_logic()
|
|
||||||
|
|
||||||
state = await self.get_login_state()
|
parsed = urllib_parse.urlparse(url)
|
||||||
if state == LoginState.LOGGED_IN:
|
host = (parsed.hostname or "").lower()
|
||||||
LOG.debug("Second login attempt succeeded")
|
path = parsed.path.lower()
|
||||||
else:
|
|
||||||
LOG.warning("Second login attempt also failed - login may not have succeeded")
|
if host != "kleinanzeigen.de" and not host.endswith(".kleinanzeigen.de"):
|
||||||
|
return False
|
||||||
|
if host == "login.kleinanzeigen.de":
|
||||||
|
return False
|
||||||
|
if path.startswith("/u/login"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
return "error" not in path
|
||||||
|
|
||||||
async def fill_login_data_and_send(self) -> None:
|
async def fill_login_data_and_send(self) -> None:
|
||||||
LOG.info("Logging in as [%s]...", self.config.login.username)
|
"""Auth0 2-step login via m-einloggen-sso.html (server-side redirect, no JS needed).
|
||||||
await self.web_input(By.ID, "login-email", self.config.login.username)
|
|
||||||
|
|
||||||
# clearing password input in case browser has stored login data set
|
Step 1: /u/login/identifier - email
|
||||||
await self.web_input(By.ID, "login-password", "")
|
Step 2: /u/login/password - password
|
||||||
await self.web_input(By.ID, "login-password", self.config.login.password)
|
"""
|
||||||
|
LOG.info("Logging in...")
|
||||||
|
|
||||||
|
await self._wait_for_auth0_login_context()
|
||||||
|
|
||||||
|
# Step 1: email identifier
|
||||||
|
LOG.debug("Auth0 Step 1: entering email...")
|
||||||
|
await self.web_input(By.ID, "username", self.config.login.username)
|
||||||
|
await self.web_click(By.CSS_SELECTOR, "button[type='submit']")
|
||||||
|
|
||||||
|
# Step 2: wait for password page then enter password
|
||||||
|
LOG.debug("Waiting for Auth0 password page...")
|
||||||
|
await self._wait_for_auth0_password_step()
|
||||||
|
|
||||||
|
LOG.debug("Auth0 Step 2: entering password...")
|
||||||
|
await self.web_input(By.CSS_SELECTOR, "input[type='password']", self.config.login.password)
|
||||||
await self.check_and_wait_for_captcha(is_login_page = True)
|
await self.check_and_wait_for_captcha(is_login_page = True)
|
||||||
|
await self.web_click(By.CSS_SELECTOR, "button[type='submit']")
|
||||||
await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']")
|
await self._wait_for_post_auth0_submit_transition()
|
||||||
|
LOG.debug("Auth0 login submitted.")
|
||||||
|
|
||||||
async def handle_after_login_logic(self) -> None:
|
async def handle_after_login_logic(self) -> None:
|
||||||
try:
|
try:
|
||||||
sms_timeout = self._timeout("sms_verification")
|
await self._check_sms_verification()
|
||||||
await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout)
|
|
||||||
LOG.warning("############################################")
|
|
||||||
LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.")
|
|
||||||
LOG.warning("############################################")
|
|
||||||
await ainput(_("Press ENTER when done..."))
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
# No SMS verification prompt detected.
|
LOG.debug("No SMS verification prompt detected after login")
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
email_timeout = self._timeout("email_verification")
|
await self._check_email_verification()
|
||||||
await self.web_find(By.TEXT, "Um dein Konto zu schützen haben wir dir eine E-Mail geschickt", timeout = email_timeout)
|
|
||||||
LOG.warning("############################################")
|
|
||||||
LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.")
|
|
||||||
LOG.warning("############################################")
|
|
||||||
await ainput(_("Press ENTER when done..."))
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
# No email verification prompt detected.
|
LOG.debug("No email verification prompt detected after login")
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
LOG.info("Handling GDPR disclaimer...")
|
LOG.debug("Handling GDPR disclaimer...")
|
||||||
gdpr_timeout = self._timeout("gdpr_prompt")
|
await self._click_gdpr_banner()
|
||||||
await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout)
|
|
||||||
await self.web_click(By.ID, "gdpr-banner-cmp-button")
|
|
||||||
await self.web_click(
|
|
||||||
By.XPATH, "//div[@id='ConsentManagementPage']//*//button//*[contains(., 'Alle ablehnen und fortfahren')]", timeout = gdpr_timeout
|
|
||||||
)
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
# GDPR banner not shown within timeout.
|
LOG.debug("GDPR banner not found or timed out")
|
||||||
pass
|
|
||||||
|
async def _check_sms_verification(self) -> None:
|
||||||
|
sms_timeout = self._timeout("sms_verification")
|
||||||
|
await self.web_find(By.TEXT, "Wir haben dir gerade einen 6-stelligen Code für die Telefonnummer", timeout = sms_timeout)
|
||||||
|
LOG.warning("############################################")
|
||||||
|
LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.")
|
||||||
|
LOG.warning("############################################")
|
||||||
|
await ainput(_("Press ENTER when done..."))
|
||||||
|
|
||||||
async def _dismiss_consent_banner(self) -> None:
|
async def _dismiss_consent_banner(self) -> None:
|
||||||
"""Dismiss the GDPR/TCF consent banner if it is present.
|
"""Dismiss the GDPR/TCF consent banner if it is present.
|
||||||
@@ -1100,65 +1211,39 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
LOG.debug("Consent banner detected, clicking 'Alle akzeptieren'...")
|
LOG.debug("Consent banner detected, clicking 'Alle akzeptieren'...")
|
||||||
await self.web_click(By.ID, "gdpr-banner-accept")
|
await self.web_click(By.ID, "gdpr-banner-accept")
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
pass # Banner not present; nothing to dismiss
|
LOG.debug("Consent banner not present; continuing without dismissal")
|
||||||
|
|
||||||
async def _auth_probe_login_state(self) -> LoginState:
|
async def _check_email_verification(self) -> None:
|
||||||
"""Probe an auth-required endpoint to classify login state.
|
email_timeout = self._timeout("email_verification")
|
||||||
|
await self.web_find(By.TEXT, "Um dein Konto zu schützen haben wir dir eine E-Mail geschickt", timeout = email_timeout)
|
||||||
|
LOG.warning("############################################")
|
||||||
|
LOG.warning("# Device verification message detected. Please follow the instruction displayed in the Browser.")
|
||||||
|
LOG.warning("############################################")
|
||||||
|
await ainput(_("Press ENTER when done..."))
|
||||||
|
|
||||||
The probe is non-mutating (GET request). It is used as a fallback method by
|
async def _click_gdpr_banner(self, *, timeout:float | None = None) -> None:
|
||||||
get_login_state() when DOM-based checks are inconclusive.
|
gdpr_timeout = self._timeout("quick_dom") if timeout is None else timeout
|
||||||
"""
|
await self.web_find(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout)
|
||||||
|
await self.web_click(By.ID, "gdpr-banner-accept", timeout = gdpr_timeout)
|
||||||
|
|
||||||
url = f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"
|
async def get_login_state(self, *, capture_diagnostics:bool = True) -> LoginState:
|
||||||
try:
|
"""Determine current login state using DOM - first detection.
|
||||||
response = await self.web_request(url, valid_response_codes = [200, 401, 403])
|
|
||||||
except (TimeoutError, AssertionError):
|
|
||||||
# AssertionError can occur when web_request() fails to parse the response (e.g., unexpected content type)
|
|
||||||
# Treat both timeout and assertion failures as UNKNOWN to avoid false assumptions about login state
|
|
||||||
return LoginState.UNKNOWN
|
|
||||||
|
|
||||||
status_code = response.get("statusCode")
|
|
||||||
if status_code in {401, 403}:
|
|
||||||
return LoginState.LOGGED_OUT
|
|
||||||
|
|
||||||
content = response.get("content", "")
|
|
||||||
if not isinstance(content, str):
|
|
||||||
return LoginState.UNKNOWN
|
|
||||||
|
|
||||||
try:
|
|
||||||
payload = json.loads(content)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
lowered = content.lower()
|
|
||||||
if "m-einloggen" in lowered or "login-email" in lowered or "login-password" in lowered or "login-form" in lowered:
|
|
||||||
return LoginState.LOGGED_OUT
|
|
||||||
return LoginState.UNKNOWN
|
|
||||||
|
|
||||||
if isinstance(payload, dict) and "ads" in payload:
|
|
||||||
return LoginState.LOGGED_IN
|
|
||||||
|
|
||||||
return LoginState.UNKNOWN
|
|
||||||
|
|
||||||
async def get_login_state(self) -> LoginState:
|
|
||||||
"""Determine current login state using layered detection.
|
|
||||||
|
|
||||||
Order:
|
Order:
|
||||||
1) DOM-based check via `is_logged_in(include_probe=False)` (preferred - stealthy)
|
1) DOM - based logged - in check via `is_logged_in(include_probe=False)`
|
||||||
2) Server-side auth probe via `_auth_probe_login_state` (fallback - more reliable)
|
2) Logged - out CTA check
|
||||||
3) If still inconclusive, capture diagnostics via
|
3) If inconclusive, optionally capture diagnostics and return `UNKNOWN`
|
||||||
`_capture_login_detection_diagnostics_if_enabled` and return `UNKNOWN`
|
|
||||||
"""
|
"""
|
||||||
# Prefer DOM-based checks first to minimize bot-like behavior.
|
# Prefer DOM-based checks first to minimize bot-like behavior and avoid
|
||||||
# The auth probe makes a JSON API request that normal users wouldn't trigger.
|
# fragile API probing side effects. Server-side auth probing was removed.
|
||||||
if await self.is_logged_in(include_probe = False):
|
if await self.is_logged_in(include_probe = False):
|
||||||
return LoginState.LOGGED_IN
|
return LoginState.LOGGED_IN
|
||||||
|
|
||||||
# Fall back to the more reliable server-side auth probe.
|
if await self._has_logged_out_cta(log_timeout = False):
|
||||||
# SPA/hydration delays can cause DOM-based checks to temporarily miss login indicators.
|
return LoginState.LOGGED_OUT
|
||||||
state = await self._auth_probe_login_state()
|
|
||||||
if state != LoginState.UNKNOWN:
|
|
||||||
return state
|
|
||||||
|
|
||||||
await self._capture_login_detection_diagnostics_if_enabled()
|
if capture_diagnostics:
|
||||||
|
await self._capture_login_detection_diagnostics_if_enabled()
|
||||||
return LoginState.UNKNOWN
|
return LoginState.UNKNOWN
|
||||||
|
|
||||||
def _diagnostics_output_dir(self) -> Path:
|
def _diagnostics_output_dir(self) -> Path:
|
||||||
@@ -1271,8 +1356,27 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
login_check_timeout,
|
login_check_timeout,
|
||||||
effective_timeout,
|
effective_timeout,
|
||||||
)
|
)
|
||||||
|
quick_dom_timeout = self._timeout("quick_dom")
|
||||||
tried_login_selectors = _format_login_detection_selectors(_LOGIN_DETECTION_SELECTORS)
|
tried_login_selectors = _format_login_detection_selectors(_LOGIN_DETECTION_SELECTORS)
|
||||||
|
|
||||||
|
try:
|
||||||
|
user_info, matched_selector = await self.web_text_first_available(
|
||||||
|
_LOGIN_DETECTION_SELECTORS,
|
||||||
|
timeout = quick_dom_timeout,
|
||||||
|
key = "quick_dom",
|
||||||
|
description = "login_detection(quick_logged_in)",
|
||||||
|
)
|
||||||
|
if username in user_info.lower():
|
||||||
|
matched_selector_display = (
|
||||||
|
f"{_LOGIN_DETECTION_SELECTORS[matched_selector][0].name}={_LOGIN_DETECTION_SELECTORS[matched_selector][1]}"
|
||||||
|
if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTORS)
|
||||||
|
else f"selector_index_{matched_selector}"
|
||||||
|
)
|
||||||
|
LOG.debug("Login detected via login detection selector '%s'", matched_selector_display)
|
||||||
|
return True
|
||||||
|
except TimeoutError:
|
||||||
|
LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
user_info, matched_selector = await self.web_text_first_available(
|
user_info, matched_selector = await self.web_text_first_available(
|
||||||
_LOGIN_DETECTION_SELECTORS,
|
_LOGIN_DETECTION_SELECTORS,
|
||||||
@@ -1281,32 +1385,60 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
description = "login_detection(selector_group)",
|
description = "login_detection(selector_group)",
|
||||||
)
|
)
|
||||||
if username in user_info.lower():
|
if username in user_info.lower():
|
||||||
matched_selector_label = (
|
matched_selector_display = (
|
||||||
_LOGIN_DETECTION_SELECTOR_LABELS[matched_selector]
|
f"{_LOGIN_DETECTION_SELECTORS[matched_selector][0].name}={_LOGIN_DETECTION_SELECTORS[matched_selector][1]}"
|
||||||
if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTOR_LABELS)
|
if 0 <= matched_selector < len(_LOGIN_DETECTION_SELECTORS)
|
||||||
else f"selector_index_{matched_selector}"
|
else f"selector_index_{matched_selector}"
|
||||||
)
|
)
|
||||||
LOG.debug("Login detected via login detection selector '%s'", matched_selector_label)
|
LOG.debug("Login detected via login detection selector '%s'", matched_selector_display)
|
||||||
return True
|
return True
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
LOG.debug("Timeout waiting for login detection selector group after %.1fs", effective_timeout)
|
LOG.debug("Timeout waiting for login detection selector group after %.1fs", effective_timeout)
|
||||||
|
|
||||||
if not include_probe:
|
if await self._has_logged_out_cta():
|
||||||
LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
state = await self._auth_probe_login_state()
|
if include_probe:
|
||||||
if state == LoginState.LOGGED_IN:
|
LOG.debug("No login detected via configured login detection selectors (%s); auth probe is disabled", tried_login_selectors)
|
||||||
return True
|
return False
|
||||||
|
|
||||||
LOG.debug(
|
LOG.debug("No login detected via configured login detection selectors (%s)", tried_login_selectors)
|
||||||
"No login detected - DOM login detection selectors (%s) did not confirm login and server probe returned %s",
|
|
||||||
tried_login_selectors,
|
|
||||||
state.name,
|
|
||||||
)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _fetch_published_ads(self) -> list[dict[str, Any]]:
|
async def _has_logged_out_cta(self, *, log_timeout:bool = True) -> bool:
|
||||||
|
quick_dom_timeout = self._timeout("quick_dom")
|
||||||
|
tried_logged_out_selectors = _format_login_detection_selectors(_LOGGED_OUT_CTA_SELECTORS)
|
||||||
|
|
||||||
|
try:
|
||||||
|
cta_element, cta_index = await self.web_find_first_available(
|
||||||
|
_LOGGED_OUT_CTA_SELECTORS,
|
||||||
|
timeout = quick_dom_timeout,
|
||||||
|
key = "quick_dom",
|
||||||
|
description = "login_detection(logged_out_cta)",
|
||||||
|
)
|
||||||
|
cta_text = await self._extract_visible_text(cta_element)
|
||||||
|
if cta_text.strip():
|
||||||
|
matched_selector_display = (
|
||||||
|
f"{_LOGGED_OUT_CTA_SELECTORS[cta_index][0].name}={_LOGGED_OUT_CTA_SELECTORS[cta_index][1]}"
|
||||||
|
if 0 <= cta_index < len(_LOGGED_OUT_CTA_SELECTORS)
|
||||||
|
else f"selector_index_{cta_index}"
|
||||||
|
)
|
||||||
|
if 0 <= cta_index < len(_LOGGED_OUT_CTA_SELECTORS):
|
||||||
|
LOG.debug("Fast logged-out pre-check matched selector '%s'", matched_selector_display)
|
||||||
|
return True
|
||||||
|
LOG.debug("Fast logged-out pre-check got unexpected selector index '%s'; failing closed", cta_index)
|
||||||
|
return False
|
||||||
|
except TimeoutError:
|
||||||
|
if log_timeout:
|
||||||
|
LOG.debug(
|
||||||
|
"Fast logged-out pre-check found no login CTA (%s) within %.1fs",
|
||||||
|
tried_logged_out_selectors,
|
||||||
|
quick_dom_timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def _fetch_published_ads(self, *, strict:bool = False) -> list[dict[str, Any]]:
|
||||||
"""Fetch all published ads, handling API pagination.
|
"""Fetch all published ads, handling API pagination.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -1326,37 +1458,84 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
try:
|
try:
|
||||||
response = await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT&pageNum={page}")
|
response = await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT&pageNum={page}")
|
||||||
except TimeoutError as ex:
|
except TimeoutError as ex:
|
||||||
LOG.warning("Pagination request timed out on page %s: %s", page, ex)
|
if strict:
|
||||||
|
raise
|
||||||
|
LOG.warning("Pagination request failed on page %s: %s", page, ex)
|
||||||
|
break
|
||||||
|
|
||||||
|
if not isinstance(response, dict):
|
||||||
|
if strict:
|
||||||
|
raise TypeError(f"Unexpected pagination response type on page {page}: {type(response).__name__}")
|
||||||
|
LOG.warning("Unexpected pagination response type on page %s: %s", page, type(response).__name__)
|
||||||
break
|
break
|
||||||
|
|
||||||
content = response.get("content", "")
|
content = response.get("content", "")
|
||||||
|
if isinstance(content, bytearray):
|
||||||
|
content = bytes(content)
|
||||||
|
if isinstance(content, bytes):
|
||||||
|
content = content.decode("utf-8", errors = "replace")
|
||||||
|
if not isinstance(content, str):
|
||||||
|
if strict:
|
||||||
|
raise TypeError(f"Unexpected response content type on page {page}: {type(content).__name__}")
|
||||||
|
LOG.warning("Unexpected response content type on page %s: %s", page, type(content).__name__)
|
||||||
|
break
|
||||||
|
|
||||||
try:
|
try:
|
||||||
json_data = json.loads(content)
|
json_data = json.loads(content)
|
||||||
except json.JSONDecodeError as ex:
|
except (json.JSONDecodeError, TypeError) as ex:
|
||||||
if not content:
|
if not content:
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"Empty JSON response content on page {page}") from ex
|
||||||
LOG.warning("Empty JSON response content on page %s", page)
|
LOG.warning("Empty JSON response content on page %s", page)
|
||||||
break
|
break
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"Failed to parse JSON response on page {page}: {ex}") from ex
|
||||||
snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "")
|
snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "")
|
||||||
LOG.warning("Failed to parse JSON response on page %s: %s (content: %s)", page, ex, snippet)
|
LOG.warning("Failed to parse JSON response on page %s: %s (content: %s)", page, ex, snippet)
|
||||||
break
|
break
|
||||||
|
|
||||||
if not isinstance(json_data, dict):
|
if not isinstance(json_data, dict):
|
||||||
|
if strict:
|
||||||
|
raise TypeError(f"Unexpected JSON payload type on page {page}: {type(json_data).__name__}")
|
||||||
snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "")
|
snippet = content[:SNIPPET_LIMIT] + ("..." if len(content) > SNIPPET_LIMIT else "")
|
||||||
LOG.warning("Unexpected JSON payload on page %s (content: %s)", page, snippet)
|
LOG.warning("Unexpected JSON payload on page %s (content: %s)", page, snippet)
|
||||||
break
|
break
|
||||||
|
|
||||||
page_ads = json_data.get("ads", [])
|
page_ads = json_data.get("ads", [])
|
||||||
if not isinstance(page_ads, list):
|
if not isinstance(page_ads, list):
|
||||||
|
if strict:
|
||||||
|
raise TypeError(f"Unexpected 'ads' type on page {page}: {type(page_ads).__name__}")
|
||||||
preview = str(page_ads)
|
preview = str(page_ads)
|
||||||
if len(preview) > SNIPPET_LIMIT:
|
if len(preview) > SNIPPET_LIMIT:
|
||||||
preview = preview[:SNIPPET_LIMIT] + "..."
|
preview = preview[:SNIPPET_LIMIT] + "..."
|
||||||
LOG.warning("Unexpected 'ads' type on page %s: %s value: %s", page, type(page_ads).__name__, preview)
|
LOG.warning("Unexpected 'ads' type on page %s: %s value: %s", page, type(page_ads).__name__, preview)
|
||||||
break
|
break
|
||||||
|
|
||||||
ads.extend(page_ads)
|
filtered_page_ads:list[dict[str, Any]] = []
|
||||||
|
rejected_count = 0
|
||||||
|
rejected_preview:str | None = None
|
||||||
|
for entry in page_ads:
|
||||||
|
if isinstance(entry, dict):
|
||||||
|
filtered_page_ads.append(entry)
|
||||||
|
continue
|
||||||
|
rejected_count += 1
|
||||||
|
if strict:
|
||||||
|
raise TypeError(f"Unexpected ad entry type on page {page}: {type(entry).__name__}")
|
||||||
|
if rejected_preview is None:
|
||||||
|
rejected_preview = repr(entry)
|
||||||
|
|
||||||
|
if rejected_count > 0:
|
||||||
|
preview = rejected_preview or "<none>"
|
||||||
|
if len(preview) > SNIPPET_LIMIT:
|
||||||
|
preview = preview[:SNIPPET_LIMIT] + "..."
|
||||||
|
LOG.warning("Filtered %s malformed ad entries on page %s (sample: %s)", rejected_count, page, preview)
|
||||||
|
|
||||||
|
ads.extend(filtered_page_ads)
|
||||||
|
|
||||||
paging = json_data.get("paging")
|
paging = json_data.get("paging")
|
||||||
if not isinstance(paging, dict):
|
if not isinstance(paging, dict):
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"Missing or invalid paging info on page {page}: {type(paging).__name__}")
|
||||||
LOG.debug("No paging dict found on page %s, assuming single page", page)
|
LOG.debug("No paging dict found on page %s, assuming single page", page)
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -1365,10 +1544,14 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
total_pages = misc.coerce_page_number(paging.get("last"))
|
total_pages = misc.coerce_page_number(paging.get("last"))
|
||||||
|
|
||||||
if current_page_num is None:
|
if current_page_num is None:
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"Invalid 'pageNum' in paging info: {paging.get('pageNum')}")
|
||||||
LOG.warning("Invalid 'pageNum' in paging info: %s, stopping pagination", paging.get("pageNum"))
|
LOG.warning("Invalid 'pageNum' in paging info: %s, stopping pagination", paging.get("pageNum"))
|
||||||
break
|
break
|
||||||
|
|
||||||
if total_pages is None:
|
if total_pages is None:
|
||||||
|
if strict:
|
||||||
|
raise ValueError("No pagination info found")
|
||||||
LOG.debug("No pagination info found, assuming single page")
|
LOG.debug("No pagination info found, assuming single page")
|
||||||
break
|
break
|
||||||
|
|
||||||
@@ -1387,6 +1570,8 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
# Use API's next field for navigation (more robust than our counter)
|
# Use API's next field for navigation (more robust than our counter)
|
||||||
next_page = misc.coerce_page_number(paging.get("next"))
|
next_page = misc.coerce_page_number(paging.get("next"))
|
||||||
if next_page is None:
|
if next_page is None:
|
||||||
|
if strict:
|
||||||
|
raise ValueError(f"Invalid 'next' page value in paging info: {paging.get('next')}")
|
||||||
LOG.warning("Invalid 'next' page value in paging info: %s, stopping pagination", paging.get("next"))
|
LOG.warning("Invalid 'next' page value in paging info: %s, stopping pagination", paging.get("next"))
|
||||||
break
|
break
|
||||||
page = next_page
|
page = next_page
|
||||||
@@ -1554,6 +1739,28 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
# Check for success messages
|
# Check for success messages
|
||||||
return await self.web_check(By.ID, "checking-done", Is.DISPLAYED) or await self.web_check(By.ID, "not-completed", Is.DISPLAYED)
|
return await self.web_check(By.ID, "checking-done", Is.DISPLAYED) or await self.web_check(By.ID, "not-completed", Is.DISPLAYED)
|
||||||
|
|
||||||
|
async def _detect_new_published_ad_ids(self, ads_before_publish:set[str], ad_title:str) -> set[str] | None:
|
||||||
|
try:
|
||||||
|
current_ads = await self._fetch_published_ads(strict = True)
|
||||||
|
current_ad_ids:set[str] = set()
|
||||||
|
for current_ad in current_ads:
|
||||||
|
if not isinstance(current_ad, dict):
|
||||||
|
# Keep duplicate-prevention verification fail-closed: malformed entries
|
||||||
|
# must abort retries rather than risk creating duplicate listings.
|
||||||
|
entry_length = len(current_ad) if hasattr(current_ad, "__len__") else None
|
||||||
|
LOG.debug("Malformed ad entry in strict duplicate verification: type=%s length=%s", type(current_ad).__name__, entry_length)
|
||||||
|
raise TypeError(f"Unexpected ad entry type: {type(current_ad).__name__}")
|
||||||
|
if current_ad.get("id"):
|
||||||
|
current_ad_ids.add(str(current_ad["id"]))
|
||||||
|
except Exception as ex: # noqa: BLE001
|
||||||
|
LOG.warning(
|
||||||
|
"Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.",
|
||||||
|
ad_title,
|
||||||
|
ex,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
return current_ad_ids - ads_before_publish
|
||||||
|
|
||||||
async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
|
async def publish_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
|
||||||
count = 0
|
count = 0
|
||||||
failed_count = 0
|
failed_count = 0
|
||||||
@@ -1589,34 +1796,33 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
raise # Respect task cancellation
|
raise # Respect task cancellation
|
||||||
except (TimeoutError, ProtocolException) as ex:
|
except (TimeoutError, ProtocolException) as ex:
|
||||||
await self._capture_publish_error_diagnostics_if_enabled(ad_cfg, ad_cfg_orig, ad_file, attempt, ex)
|
await self._capture_publish_error_diagnostics_if_enabled(ad_cfg, ad_cfg_orig, ad_file, attempt, ex)
|
||||||
if attempt < max_retries:
|
if attempt >= max_retries:
|
||||||
# Before retrying, check if the ad was already created despite the error.
|
|
||||||
# A partially successful submission followed by a retry would create a duplicate listing,
|
|
||||||
# which violates kleinanzeigen.de terms of service and can lead to account suspension.
|
|
||||||
try:
|
|
||||||
current_ads = await self._fetch_published_ads()
|
|
||||||
current_ad_ids = {str(x["id"]) for x in current_ads if x.get("id")}
|
|
||||||
new_ad_ids = current_ad_ids - ads_before_publish
|
|
||||||
if new_ad_ids:
|
|
||||||
LOG.warning(
|
|
||||||
"Attempt %s/%s failed for '%s': %s. "
|
|
||||||
"However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.",
|
|
||||||
attempt, max_retries, ad_cfg.title, ex, ", ".join(new_ad_ids)
|
|
||||||
)
|
|
||||||
failed_count += 1
|
|
||||||
break
|
|
||||||
except Exception as verify_ex: # noqa: BLE001
|
|
||||||
LOG.warning(
|
|
||||||
"Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.",
|
|
||||||
ad_cfg.title, verify_ex,
|
|
||||||
)
|
|
||||||
failed_count += 1
|
|
||||||
break
|
|
||||||
LOG.warning("Attempt %s/%s failed for '%s': %s. Retrying...", attempt, max_retries, ad_cfg.title, ex)
|
|
||||||
await self.web_sleep(2) # Wait before retry
|
|
||||||
else:
|
|
||||||
LOG.error("All %s attempts failed for '%s': %s. Skipping ad.", max_retries, ad_cfg.title, ex)
|
LOG.error("All %s attempts failed for '%s': %s. Skipping ad.", max_retries, ad_cfg.title, ex)
|
||||||
failed_count += 1
|
failed_count += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Before retrying, check if the ad was already created despite the error.
|
||||||
|
# A partially successful submission followed by a retry would create a duplicate listing,
|
||||||
|
# which violates kleinanzeigen.de terms of service and can lead to account suspension.
|
||||||
|
new_ad_ids = await self._detect_new_published_ad_ids(ads_before_publish, ad_cfg.title)
|
||||||
|
if new_ad_ids is None:
|
||||||
|
failed_count += 1
|
||||||
|
break
|
||||||
|
if new_ad_ids:
|
||||||
|
LOG.warning(
|
||||||
|
"Attempt %s/%s failed for '%s': %s. "
|
||||||
|
"However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.",
|
||||||
|
attempt,
|
||||||
|
max_retries,
|
||||||
|
ad_cfg.title,
|
||||||
|
ex,
|
||||||
|
", ".join(new_ad_ids),
|
||||||
|
)
|
||||||
|
failed_count += 1
|
||||||
|
break
|
||||||
|
|
||||||
|
LOG.warning("Attempt %s/%s failed for '%s': %s. Retrying...", attempt, max_retries, ad_cfg.title, ex)
|
||||||
|
await self.web_sleep(2_000) # Wait before retry
|
||||||
|
|
||||||
# Check publishing result separately (no retry - ad is already submitted)
|
# Check publishing result separately (no retry - ad is already submitted)
|
||||||
if success:
|
if success:
|
||||||
@@ -1640,10 +1846,10 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]], mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE
|
self, ad_file:str, ad_cfg:Ad, ad_cfg_orig:dict[str, Any], published_ads:list[dict[str, Any]], mode:AdUpdateStrategy = AdUpdateStrategy.REPLACE
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
@param ad_cfg: the effective ad config (i.e. with default values applied etc.)
|
@ param ad_cfg: the effective ad config(i.e. with default values applied etc.)
|
||||||
@param ad_cfg_orig: the ad config as present in the YAML file
|
@ param ad_cfg_orig: the ad config as present in the YAML file
|
||||||
@param published_ads: json list of published ads
|
@ param published_ads: json list of published ads
|
||||||
@param mode: the mode of ad editing, either publishing a new or updating an existing ad
|
@ param mode: the mode of ad editing, either publishing a new or updating an existing ad
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if mode == AdUpdateStrategy.REPLACE:
|
if mode == AdUpdateStrategy.REPLACE:
|
||||||
@@ -2256,7 +2462,7 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
async def download_ads(self) -> None:
|
async def download_ads(self) -> None:
|
||||||
"""
|
"""
|
||||||
Determines which download mode was chosen with the arguments, and calls the specified download routine.
|
Determines which download mode was chosen with the arguments, and calls the specified download routine.
|
||||||
This downloads either all, only unsaved (new), or specific ads given by ID.
|
This downloads either all, only unsaved(new), or specific ads given by ID.
|
||||||
"""
|
"""
|
||||||
# Fetch published ads once from manage-ads JSON to avoid repetitive API calls during extraction
|
# Fetch published ads once from manage-ads JSON to avoid repetitive API calls during extraction
|
||||||
# Build lookup dict inline and pass directly to extractor (no cache abstraction needed)
|
# Build lookup dict inline and pass directly to extractor (no cache abstraction needed)
|
||||||
@@ -2345,10 +2551,10 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
|
|||||||
def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str:
|
def __get_description(self, ad_cfg:Ad, *, with_affixes:bool) -> str:
|
||||||
"""Get the ad description optionally with prefix and suffix applied.
|
"""Get the ad description optionally with prefix and suffix applied.
|
||||||
|
|
||||||
Precedence (highest to lowest):
|
Precedence(highest to lowest):
|
||||||
1. Direct ad-level affixes (description_prefix/suffix)
|
1. Direct ad - level affixes(description_prefix / suffix)
|
||||||
2. Global flattened affixes (ad_defaults.description_prefix/suffix)
|
2. Global flattened affixes(ad_defaults.description_prefix / suffix)
|
||||||
3. Legacy global nested affixes (ad_defaults.description.prefix/suffix)
|
3. Legacy global nested affixes(ad_defaults.description.prefix / suffix)
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
ad_cfg: The ad configuration dictionary
|
ad_cfg: The ad configuration dictionary
|
||||||
@@ -2420,8 +2626,8 @@ def main(args:list[str]) -> None:
|
|||||||
print(
|
print(
|
||||||
textwrap.dedent(rf"""
|
textwrap.dedent(rf"""
|
||||||
_ _ _ _ _ _
|
_ _ _ _ _ _
|
||||||
| | _| | ___(_)_ __ __ _ _ __ _______(_) __ _ ___ _ __ | |__ ___ | |_
|
| | _ | | ___(_)_ __ __ _ _ __ _______(_) __ _ ___ _ __ | |__ ___ | |_
|
||||||
| |/ / |/ _ \ | '_ \ / _` | '_ \|_ / _ \ |/ _` |/ _ \ '_ \ ____| '_ \ / _ \| __|
|
| | / / | / _ \ | '_ \ / _` | '_ \|_ / _ \ |/ _` |/ _ \ '_ \ ____| '_ \ / _ \| __|
|
||||||
| <| | __/ | | | | (_| | | | |/ / __/ | (_| | __/ | | |____| |_) | (_) | |_
|
| <| | __/ | | | | (_| | | | |/ / __/ | (_| | __/ | | |____| |_) | (_) | |_
|
||||||
|_|\_\_|\___|_|_| |_|\__,_|_| |_/___\___|_|\__, |\___|_| |_| |_.__/ \___/ \__|
|
|_|\_\_|\___|_|_| |_|\__,_|_| |_/___\___|_|\__, |\___|_| |_| |_.__/ \___/ \__|
|
||||||
|___/
|
|___/
|
||||||
|
|||||||
@@ -37,9 +37,12 @@ kleinanzeigen_bot/__init__.py:
|
|||||||
"Empty JSON response content on page %s": "Leerer JSON-Antwortinhalt auf Seite %s"
|
"Empty JSON response content on page %s": "Leerer JSON-Antwortinhalt auf Seite %s"
|
||||||
"Failed to parse JSON response on page %s: %s (content: %s)": "Fehler beim Parsen der JSON-Antwort auf Seite %s: %s (Inhalt: %s)"
|
"Failed to parse JSON response on page %s: %s (content: %s)": "Fehler beim Parsen der JSON-Antwort auf Seite %s: %s (Inhalt: %s)"
|
||||||
"Stopping pagination after %s pages to avoid infinite loop": "Stoppe die Seitenaufschaltung nach %s Seiten, um eine Endlosschleife zu vermeiden"
|
"Stopping pagination after %s pages to avoid infinite loop": "Stoppe die Seitenaufschaltung nach %s Seiten, um eine Endlosschleife zu vermeiden"
|
||||||
"Pagination request timed out on page %s: %s": "Zeitueberschreitung bei der Seitenabfrage auf Seite %s: %s"
|
"Pagination request failed on page %s: %s": "Seitenabfrage auf Seite %s fehlgeschlagen: %s"
|
||||||
|
"Unexpected pagination response type on page %s: %s": "Unerwarteter Typ der Paginierungsantwort auf Seite %s: %s"
|
||||||
|
"Unexpected response content type on page %s: %s": "Unerwarteter Antwortinhalt-Typ auf Seite %s: %s"
|
||||||
"Unexpected JSON payload on page %s (content: %s)": "Unerwartete JSON-Antwort auf Seite %s (Inhalt: %s)"
|
"Unexpected JSON payload on page %s (content: %s)": "Unerwartete JSON-Antwort auf Seite %s (Inhalt: %s)"
|
||||||
"Unexpected 'ads' type on page %s: %s value: %s": "Unerwarteter 'ads'-Typ auf Seite %s: %s Wert: %s"
|
"Unexpected 'ads' type on page %s: %s value: %s": "Unerwarteter 'ads'-Typ auf Seite %s: %s Wert: %s"
|
||||||
|
"Filtered %s malformed ad entries on page %s (sample: %s)": "%s fehlerhafte Anzeigen-Einträge auf Seite %s gefiltert (Beispiel: %s)"
|
||||||
"Reached last page %s of %s, stopping pagination": "Letzte Seite %s von %s erreicht, beende Paginierung"
|
"Reached last page %s of %s, stopping pagination": "Letzte Seite %s von %s erreicht, beende Paginierung"
|
||||||
"No ads found on page %s, stopping pagination": "Keine Anzeigen auf Seite %s gefunden, beende Paginierung"
|
"No ads found on page %s, stopping pagination": "Keine Anzeigen auf Seite %s gefunden, beende Paginierung"
|
||||||
"Invalid 'next' page value in paging info: %s, stopping pagination": "Ungültiger 'next'-Seitenwert in Paginierungsinfo: %s, beende Paginierung"
|
"Invalid 'next' page value in paging info: %s, stopping pagination": "Ungültiger 'next'-Seitenwert in Paginierungsinfo: %s, beende Paginierung"
|
||||||
@@ -86,14 +89,36 @@ kleinanzeigen_bot/__init__.py:
|
|||||||
|
|
||||||
login:
|
login:
|
||||||
"Checking if already logged in...": "Überprüfe, ob bereits eingeloggt..."
|
"Checking if already logged in...": "Überprüfe, ob bereits eingeloggt..."
|
||||||
"Current page URL after opening homepage: %s": "Aktuelle Seiten-URL nach dem Öffnen der Startseite: %s"
|
"Already logged in. Skipping login.": "Bereits eingeloggt. Überspringe Anmeldung."
|
||||||
"Already logged in as [%s]. Skipping login.": "Bereits eingeloggt als [%s]. Überspringe Anmeldung."
|
"Navigating to SSO login page (Auth0)...": "Navigiere zur SSO-Anmeldeseite (Auth0)..."
|
||||||
"Opening login page...": "Öffne Anmeldeseite..."
|
"Timeout navigating to SSO login page after %.1fs": "Zeitüberschreitung beim Navigieren zur SSO-Anmeldeseite nach %.1fs"
|
||||||
"Login state is UNKNOWN - cannot determine if already logged in. Skipping login attempt.": "Login-Status ist UNKNOWN - kann nicht bestimmt werden, ob bereits eingeloggt ist. Überspringe Anmeldeversuch."
|
"Login confirmed.": "Anmeldung bestätigt."
|
||||||
"Login state is UNKNOWN after first login attempt - cannot determine login status. Aborting login process.": "Login-Status ist UNKNOWN nach dem ersten Anmeldeversuch - kann Login-Status nicht bestimmen. Breche Anmeldeprozess ab."
|
"Login state after attempt is %s (url=%s)": "Login-Status nach dem Versuch ist %s (URL=%s)"
|
||||||
"First login attempt did not succeed, trying second login attempt": "Erster Anmeldeversuch war nicht erfolgreich, versuche zweiten Anmeldeversuch"
|
"Login could not be confirmed after Auth0 flow (state=%s, url=%s)": "Anmeldung nach Auth0-Flow konnte nicht bestätigt werden (Status=%s, URL=%s)"
|
||||||
"Second login attempt succeeded": "Zweiter Anmeldeversuch erfolgreich"
|
|
||||||
"Second login attempt also failed - login may not have succeeded": "Zweiter Anmeldeversuch ebenfalls fehlgeschlagen - Anmeldung möglicherweise nicht erfolgreich"
|
_wait_for_auth0_login_context:
|
||||||
|
"Auth0 redirect not detected (url=%s)": "Auth0-Weiterleitung nicht erkannt (URL=%s)"
|
||||||
|
|
||||||
|
_wait_for_auth0_password_step:
|
||||||
|
"Auth0 password step not reached (url=%s)": "Auth0-Passwortschritt nicht erreicht (URL=%s)"
|
||||||
|
|
||||||
|
_wait_for_post_auth0_submit_transition:
|
||||||
|
"Auth0 post-submit verification remained inconclusive (url=%s)": "Auth0-Verifikation nach Absenden blieb unklar (URL=%s)"
|
||||||
|
|
||||||
|
fill_login_data_and_send:
|
||||||
|
"Logging in...": "Anmeldung..."
|
||||||
|
"Auth0 Step 1: entering email...": "Auth0 Schritt 1: E-Mail wird eingegeben..."
|
||||||
|
"Waiting for Auth0 password page...": "Warte auf Auth0-Passwortseite..."
|
||||||
|
"Auth0 Step 2: entering password...": "Auth0 Schritt 2: Passwort wird eingegeben..."
|
||||||
|
"Auth0 login submitted.": "Auth0-Anmeldung abgesendet."
|
||||||
|
|
||||||
|
_check_sms_verification:
|
||||||
|
"# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen."
|
||||||
|
"Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..."
|
||||||
|
|
||||||
|
_check_email_verification:
|
||||||
|
"# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen."
|
||||||
|
"Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..."
|
||||||
|
|
||||||
is_logged_in:
|
is_logged_in:
|
||||||
"Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)"
|
"Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)": "Starte Login-Erkennung (Timeout: %.1fs Basis, %.1fs effektiv mit Multiplikator/Backoff)"
|
||||||
@@ -101,8 +126,6 @@ kleinanzeigen_bot/__init__.py:
|
|||||||
"Timeout waiting for login detection selector group after %.1fs": "Timeout beim Warten auf die Login-Erkennungs-Selektorgruppe nach %.1fs"
|
"Timeout waiting for login detection selector group after %.1fs": "Timeout beim Warten auf die Login-Erkennungs-Selektorgruppe nach %.1fs"
|
||||||
|
|
||||||
handle_after_login_logic:
|
handle_after_login_logic:
|
||||||
"# Device verification message detected. Please follow the instruction displayed in the Browser.": "# Nachricht zur Geräteverifizierung erkannt. Bitte den Anweisungen im Browser folgen."
|
|
||||||
"Press ENTER when done...": "EINGABETASTE drücken, wenn erledigt..."
|
|
||||||
"Handling GDPR disclaimer...": "Verarbeite DSGVO-Hinweis..."
|
"Handling GDPR disclaimer...": "Verarbeite DSGVO-Hinweis..."
|
||||||
|
|
||||||
delete_ads:
|
delete_ads:
|
||||||
@@ -156,11 +179,14 @@ kleinanzeigen_bot/__init__.py:
|
|||||||
"Attempt %s/%s failed for '%s': %s. Retrying...": "Versuch %s/%s fehlgeschlagen für '%s': %s. Erneuter Versuch..."
|
"Attempt %s/%s failed for '%s': %s. Retrying...": "Versuch %s/%s fehlgeschlagen für '%s': %s. Erneuter Versuch..."
|
||||||
"Attempt %s/%s failed for '%s': %s. However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.": "Versuch %s/%s fehlgeschlagen für '%s': %s. Jedoch wurde eine neue Anzeige erkannt (ID: %s) -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden."
|
"Attempt %s/%s failed for '%s': %s. However, a new ad was detected (id: %s) -- aborting retries to prevent duplicates.": "Versuch %s/%s fehlgeschlagen für '%s': %s. Jedoch wurde eine neue Anzeige erkannt (ID: %s) -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden."
|
||||||
"Could not fetch fresh published-ads baseline for '%s': %s. Falling back to initial snapshot.": "Konnte keine aktuelle Anzeigen-Baseline für '%s' abrufen: %s. Verwende initialen Snapshot."
|
"Could not fetch fresh published-ads baseline for '%s': %s. Falling back to initial snapshot.": "Konnte keine aktuelle Anzeigen-Baseline für '%s' abrufen: %s. Verwende initialen Snapshot."
|
||||||
"Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.": "Veröffentlichte Anzeigen konnten nach fehlgeschlagenem Versuch für '%s' nicht geprüft werden: %s -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden."
|
|
||||||
"All %s attempts failed for '%s': %s. Skipping ad.": "Alle %s Versuche fehlgeschlagen für '%s': %s. Überspringe Anzeige."
|
"All %s attempts failed for '%s': %s. Skipping ad.": "Alle %s Versuche fehlgeschlagen für '%s': %s. Überspringe Anzeige."
|
||||||
"DONE: (Re-)published %s (%s failed after retries)": "FERTIG: %s (erneut) veröffentlicht (%s fehlgeschlagen nach Wiederholungen)"
|
"DONE: (Re-)published %s (%s failed after retries)": "FERTIG: %s (erneut) veröffentlicht (%s fehlgeschlagen nach Wiederholungen)"
|
||||||
"DONE: (Re-)published %s": "FERTIG: %s (erneut) veröffentlicht"
|
"DONE: (Re-)published %s": "FERTIG: %s (erneut) veröffentlicht"
|
||||||
"ad": "Anzeige"
|
"ad": "Anzeige"
|
||||||
|
|
||||||
|
_detect_new_published_ad_ids:
|
||||||
|
"Could not verify published ads after failed attempt for '%s': %s -- aborting retries to prevent duplicates.": "Veröffentlichte Anzeigen konnten nach fehlgeschlagenem Versuch für '%s' nicht geprüft werden: %s -- Wiederholungen werden abgebrochen, um Duplikate zu vermeiden."
|
||||||
|
|
||||||
apply_auto_price_reduction:
|
apply_auto_price_reduction:
|
||||||
"Auto price reduction is enabled for [%s] but no price is configured.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber es wurde kein Preis konfiguriert."
|
"Auto price reduction is enabled for [%s] but no price is configured.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber es wurde kein Preis konfiguriert."
|
||||||
"Auto price reduction is enabled for [%s] but min_price equals price (%s) - no reductions will occur.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber min_price entspricht dem Preis (%s) - es werden keine Reduktionen auftreten."
|
"Auto price reduction is enabled for [%s] but min_price equals price (%s) - no reductions will occur.": "Automatische Preisreduzierung ist für [%s] aktiviert, aber min_price entspricht dem Preis (%s) - es werden keine Reduktionen auftreten."
|
||||||
@@ -264,9 +290,6 @@ kleinanzeigen_bot/__init__.py:
|
|||||||
"Unknown command: %s": "Unbekannter Befehl: %s"
|
"Unknown command: %s": "Unbekannter Befehl: %s"
|
||||||
"Timing collector flush failed: %s": "Zeitmessdaten konnten nicht gespeichert werden: %s"
|
"Timing collector flush failed: %s": "Zeitmessdaten konnten nicht gespeichert werden: %s"
|
||||||
|
|
||||||
fill_login_data_and_send:
|
|
||||||
"Logging in as [%s]...": "Anmeldung als [%s]..."
|
|
||||||
|
|
||||||
__set_shipping:
|
__set_shipping:
|
||||||
"Unable to close shipping dialog!": "Versanddialog konnte nicht geschlossen werden!"
|
"Unable to close shipping dialog!": "Versanddialog konnte nicht geschlossen werden!"
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# SPDX-FileCopyrightText: © Jens Bergmann and contributors
|
# SPDX-FileCopyrightText: © Jens Bergmann and contributors
|
||||||
# SPDX-License-Identifier: AGPL-3.0-or-later
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||||||
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||||
import copy, fnmatch, io, json, logging, os, tempfile # isort: skip
|
import asyncio, copy, fnmatch, io, json, logging, os, tempfile # isort: skip
|
||||||
from collections.abc import Callable, Generator
|
from collections.abc import Callable, Generator
|
||||||
from contextlib import redirect_stdout
|
from contextlib import redirect_stdout
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
@@ -442,7 +442,12 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_is_logged_in_returns_true_when_logged_in(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify that login check returns true when logged in."""
|
"""Verify that login check returns true when logged in."""
|
||||||
with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)):
|
with patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
return_value = ("Welcome dummy_user", 0),
|
||||||
|
):
|
||||||
assert await test_bot.is_logged_in() is True
|
assert await test_bot.is_logged_in() is True
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -460,45 +465,96 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_is_logged_in_returns_false_when_not_logged_in(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify that login check returns false when not logged in."""
|
"""Verify that login check returns false when not logged in."""
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
|
||||||
patch.object(
|
patch.object(
|
||||||
test_bot,
|
test_bot,
|
||||||
"web_request",
|
"web_text_first_available",
|
||||||
new_callable = AsyncMock,
|
new_callable = AsyncMock,
|
||||||
return_value = {"statusCode": 200, "content": "<html><a href='/m-einloggen.html'>login</a></html>"},
|
side_effect = [("nicht-eingeloggt", 0), ("kein user signal", 0)],
|
||||||
),
|
),
|
||||||
|
patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False),
|
||||||
):
|
):
|
||||||
assert await test_bot.is_logged_in() is False
|
assert await test_bot.is_logged_in() is False
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_has_logged_out_cta_requires_visible_candidate(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify login detection uses selector-group lookup with login_detection timeout key."""
|
matched_element = MagicMock(spec = Element)
|
||||||
with patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as group_text:
|
with (
|
||||||
assert await test_bot.is_logged_in(include_probe = False) is True
|
patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)),
|
||||||
|
patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = ""),
|
||||||
group_text.assert_awaited_once()
|
):
|
||||||
call_args = group_text.await_args
|
assert await test_bot._has_logged_out_cta() is False
|
||||||
assert call_args is not None
|
|
||||||
assert call_args.args[0] == [(By.CLASS_NAME, "mr-medium"), (By.ID, "user-email")]
|
|
||||||
assert call_args.kwargs["key"] == "login_detection"
|
|
||||||
assert call_args.kwargs["timeout"] == test_bot._timeout("login_detection")
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_is_logged_in_logs_selector_label_without_raw_selector_literals(
|
async def test_has_logged_out_cta_accepts_visible_candidate(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
matched_element = MagicMock(spec = Element)
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)),
|
||||||
|
patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = "Einloggen"),
|
||||||
|
):
|
||||||
|
assert await test_bot._has_logged_out_cta() is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_is_logged_in_uses_selector_group_timeout_key(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Verify login detection uses selector-group lookup with login_detection timeout key."""
|
||||||
|
with patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
side_effect = [TimeoutError(), ("Welcome dummy_user", 0)],
|
||||||
|
) as group_text:
|
||||||
|
assert await test_bot.is_logged_in(include_probe = False) is True
|
||||||
|
|
||||||
|
group_text.assert_awaited()
|
||||||
|
assert any(call.kwargs.get("timeout") == test_bot._timeout("login_detection") for call in group_text.await_args_list)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_is_logged_in_runs_full_selector_group_before_cta_precheck(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Quick CTA checks must not short-circuit before full logged-in selector checks."""
|
||||||
|
with patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
side_effect = [TimeoutError(), ("Welcome dummy_user", 0)],
|
||||||
|
) as group_text:
|
||||||
|
assert await test_bot.is_logged_in(include_probe = False) is True
|
||||||
|
|
||||||
|
group_text.assert_awaited()
|
||||||
|
assert group_text.await_count >= 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_is_logged_in_short_circuits_before_cta_check_when_quick_user_signal_matches(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Logged-in quick pre-check should win even if incidental login links exist elsewhere."""
|
||||||
|
with patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
return_value = ("angemeldet als: dummy_user", 0),
|
||||||
|
) as group_text:
|
||||||
|
assert await test_bot.is_logged_in(include_probe = False) is True
|
||||||
|
|
||||||
|
group_text.assert_awaited()
|
||||||
|
assert group_text.await_count >= 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_is_logged_in_logs_matched_raw_selector(
|
||||||
self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture
|
self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Login detection logs should reference stable labels, not raw selector values."""
|
"""Login detection logs should show the matched raw selector."""
|
||||||
caplog.set_level("DEBUG")
|
caplog.set_level("DEBUG")
|
||||||
|
|
||||||
with (
|
with (
|
||||||
caplog.at_level("DEBUG"),
|
caplog.at_level("DEBUG"),
|
||||||
patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("angemeldet als: dummy_user", 1)),
|
patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
return_value = ("angemeldet als: dummy_user", 0),
|
||||||
|
),
|
||||||
):
|
):
|
||||||
assert await test_bot.is_logged_in(include_probe = False) is True
|
assert await test_bot.is_logged_in(include_probe = False) is True
|
||||||
|
|
||||||
assert "Login detected via login detection selector 'user_info_secondary'" in caplog.text
|
assert "Login detected via login detection selector" in caplog.text
|
||||||
for forbidden in (".mr-medium", "#user-email", "mr-medium", "user-email"):
|
assert "CLASS_NAME=mr-medium" in caplog.text
|
||||||
assert forbidden not in caplog.text
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_is_logged_in_logs_generic_message_when_selector_group_does_not_match(
|
async def test_is_logged_in_logs_generic_message_when_selector_group_does_not_match(
|
||||||
@@ -509,78 +565,87 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
|
|
||||||
with (
|
with (
|
||||||
caplog.at_level("DEBUG"),
|
caplog.at_level("DEBUG"),
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]),
|
||||||
|
patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False),
|
||||||
):
|
):
|
||||||
assert await test_bot.is_logged_in(include_probe = False) is False
|
assert await test_bot.is_logged_in(include_probe = False) is False
|
||||||
|
|
||||||
assert any(
|
assert "No login detected via configured login detection selectors" in caplog.text
|
||||||
record.message == "No login detected via configured login detection selectors (CLASS_NAME=mr-medium, ID=user-email)"
|
assert "CLASS_NAME=mr-medium" in caplog.text
|
||||||
for record in caplog.records
|
assert "ID=user-email" in caplog.text
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_is_logged_in_logs_raw_selectors_when_probe_reports_logged_out(
|
async def test_is_logged_in_logs_raw_selectors_when_dom_checks_fail_and_probe_disabled(
|
||||||
self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture
|
self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Probe-based final failure should include the tried raw selectors for debugging."""
|
"""Final failure should report selectors and disabled-probe state."""
|
||||||
caplog.set_level("DEBUG")
|
caplog.set_level("DEBUG")
|
||||||
|
|
||||||
with (
|
with (
|
||||||
caplog.at_level("DEBUG"),
|
caplog.at_level("DEBUG"),
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]),
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT),
|
patch.object(test_bot, "_has_logged_out_cta", new_callable = AsyncMock, return_value = False),
|
||||||
):
|
):
|
||||||
assert await test_bot.is_logged_in() is False
|
assert await test_bot.is_logged_in() is False
|
||||||
|
|
||||||
assert any(
|
assert "No login detected via configured login detection selectors" in caplog.text
|
||||||
record.message == (
|
assert "auth probe is disabled" in caplog.text
|
||||||
"No login detected - DOM login detection selectors (CLASS_NAME=mr-medium, ID=user-email) "
|
|
||||||
"did not confirm login and server probe returned LOGGED_OUT"
|
|
||||||
)
|
|
||||||
for record in caplog.records
|
|
||||||
)
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_login_state_prefers_dom_over_auth_probe(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_get_login_state_prefers_dom_checks(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_text_first_available", new_callable = AsyncMock, return_value = ("Welcome dummy_user", 0)) as web_text,
|
|
||||||
patch.object(
|
patch.object(
|
||||||
test_bot, "_auth_probe_login_state", new_callable = AsyncMock, side_effect = AssertionError("Probe must not run when DOM is deterministic")
|
test_bot,
|
||||||
) as probe,
|
"web_text_first_available",
|
||||||
|
new_callable = AsyncMock,
|
||||||
|
return_value = ("Welcome dummy_user", 0),
|
||||||
|
) as web_text,
|
||||||
):
|
):
|
||||||
assert await test_bot.get_login_state() == LoginState.LOGGED_IN
|
assert await test_bot.get_login_state() == LoginState.LOGGED_IN
|
||||||
web_text.assert_awaited_once()
|
web_text.assert_awaited_once()
|
||||||
probe.assert_not_called()
|
|
||||||
|
def test_current_page_url_strips_query_and_fragment(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
page = MagicMock()
|
||||||
|
page.url = "https://login.kleinanzeigen.de/u/login/password?state=secret&code=abc#frag"
|
||||||
|
test_bot.page = page
|
||||||
|
|
||||||
|
assert test_bot._current_page_url() == "https://login.kleinanzeigen.de/u/login/password"
|
||||||
|
|
||||||
|
def test_is_valid_post_auth0_destination_filters_invalid_urls(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/") is True
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/m-meine-anzeigen.html") is True
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://foo.kleinanzeigen.de/") is True
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("unknown") is False
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("about:blank") is False
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://evilkleinanzeigen.de/") is False
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://kleinanzeigen.de.evil.com/") is False
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://login.kleinanzeigen.de/u/login/password") is False
|
||||||
|
assert test_bot._is_valid_post_auth0_destination("https://www.kleinanzeigen.de/login-error-500") is False
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_login_state_falls_back_to_auth_probe_when_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_get_login_state_returns_unknown_when_dom_checks_are_inconclusive(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError()]) as web_text,
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as probe,
|
patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()) as cta_find,
|
||||||
):
|
|
||||||
assert await test_bot.get_login_state() == LoginState.LOGGED_IN
|
|
||||||
web_text.assert_awaited_once()
|
|
||||||
probe.assert_awaited_once()
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_login_state_falls_back_to_auth_probe_when_dom_logged_out(self, test_bot:KleinanzeigenBot) -> None:
|
|
||||||
with (
|
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
|
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as probe,
|
|
||||||
):
|
|
||||||
assert await test_bot.get_login_state() == LoginState.LOGGED_OUT
|
|
||||||
web_text.assert_awaited_once()
|
|
||||||
probe.assert_awaited_once()
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_login_state_returns_unknown_when_probe_unknown_and_dom_inconclusive(self, test_bot:KleinanzeigenBot) -> None:
|
|
||||||
with (
|
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN) as probe,
|
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError) as web_text,
|
|
||||||
):
|
):
|
||||||
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
||||||
probe.assert_awaited_once()
|
assert web_text.await_count == 2
|
||||||
web_text.assert_awaited_once()
|
assert cta_find.await_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_login_state_returns_logged_out_when_cta_detected(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
matched_element = MagicMock(spec = Element)
|
||||||
|
with (
|
||||||
|
patch.object(
|
||||||
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
side_effect = [TimeoutError(), TimeoutError()],
|
||||||
|
) as web_text,
|
||||||
|
patch.object(test_bot, "web_find_first_available", new_callable = AsyncMock, return_value = (matched_element, 0)),
|
||||||
|
patch.object(test_bot, "_extract_visible_text", new_callable = AsyncMock, return_value = "Hier einloggen"),
|
||||||
|
):
|
||||||
|
assert await test_bot.get_login_state() == LoginState.LOGGED_OUT
|
||||||
|
assert web_text.await_count == 2
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None:
|
async def test_get_login_state_unknown_captures_diagnostics_when_enabled(self, test_bot:KleinanzeigenBot, tmp_path:Path) -> None:
|
||||||
@@ -592,8 +657,8 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
test_bot.page = page
|
test_bot.page = page
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]),
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()),
|
||||||
):
|
):
|
||||||
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
||||||
|
|
||||||
@@ -610,8 +675,8 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
test_bot.page = page
|
test_bot.page = page
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]),
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()),
|
||||||
):
|
):
|
||||||
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
assert await test_bot.get_login_state() == LoginState.UNKNOWN
|
||||||
|
|
||||||
@@ -633,8 +698,21 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
stdin_mock.isatty.return_value = True
|
stdin_mock.isatty.return_value = True
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
|
patch.object(
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
test_bot,
|
||||||
|
"web_text_first_available",
|
||||||
|
side_effect = [
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
TimeoutError(),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()),
|
||||||
patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
|
patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
|
||||||
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
||||||
):
|
):
|
||||||
@@ -661,8 +739,8 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
stdin_mock.isatty.return_value = False
|
stdin_mock.isatty.return_value = False
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "_auth_probe_login_state", new_callable = AsyncMock, return_value = LoginState.UNKNOWN),
|
patch.object(test_bot, "web_text_first_available", side_effect = [TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError()]),
|
||||||
patch.object(test_bot, "web_text_first_available", side_effect = TimeoutError),
|
patch.object(test_bot, "web_find_first_available", side_effect = TimeoutError()),
|
||||||
patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
|
patch("kleinanzeigen_bot.sys.stdin", stdin_mock),
|
||||||
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
||||||
):
|
):
|
||||||
@@ -676,67 +754,71 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_open") as mock_open,
|
patch.object(test_bot, "web_open") as mock_open,
|
||||||
patch.object(test_bot, "get_login_state", new_callable = AsyncMock, side_effect = [LoginState.LOGGED_OUT, LoginState.LOGGED_IN]) as mock_logged_in,
|
patch.object(test_bot, "get_login_state", new_callable = AsyncMock, side_effect = [LoginState.LOGGED_OUT, LoginState.LOGGED_IN]) as mock_logged_in,
|
||||||
patch.object(test_bot, "web_find", side_effect = TimeoutError),
|
patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock),
|
||||||
patch.object(test_bot, "web_input") as mock_input,
|
patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock) as mock_fill,
|
||||||
patch.object(test_bot, "web_click") as mock_click,
|
patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock) as mock_after_login,
|
||||||
|
patch.object(test_bot, "_dismiss_consent_banner", new_callable = AsyncMock),
|
||||||
):
|
):
|
||||||
await test_bot.login()
|
await test_bot.login()
|
||||||
|
|
||||||
mock_open.assert_called()
|
opened_urls = [call.args[0] for call in mock_open.call_args_list]
|
||||||
mock_logged_in.assert_called()
|
assert any(url.startswith(test_bot.root_url) for url in opened_urls)
|
||||||
mock_input.assert_called()
|
assert any(url.endswith("/m-einloggen-sso.html") for url in opened_urls)
|
||||||
mock_click.assert_called()
|
mock_logged_in.assert_awaited()
|
||||||
|
mock_fill.assert_awaited_once()
|
||||||
|
mock_after_login.assert_awaited_once()
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_login_flow_handles_captcha(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_login_flow_returns_early_when_already_logged_in(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify that login flow handles captcha correctly."""
|
"""Login should return early when state is already LOGGED_IN."""
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_open"),
|
patch.object(test_bot, "web_open") as mock_open,
|
||||||
patch.object(
|
patch.object(test_bot, "get_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_IN) as mock_state,
|
||||||
test_bot,
|
patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock),
|
||||||
"get_login_state",
|
patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock) as mock_fill,
|
||||||
new_callable = AsyncMock,
|
patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock) as mock_after_login,
|
||||||
side_effect = [LoginState.LOGGED_OUT, LoginState.LOGGED_OUT, LoginState.LOGGED_IN],
|
|
||||||
),
|
|
||||||
patch.object(test_bot, "web_find") as mock_find,
|
|
||||||
patch.object(test_bot, "web_input") as mock_input,
|
|
||||||
patch.object(test_bot, "web_click") as mock_click,
|
|
||||||
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
|
||||||
):
|
):
|
||||||
# Mock the sequence of web_find calls:
|
|
||||||
# 0. Consent banner not found (in _dismiss_consent_banner, before login state check)
|
|
||||||
# First login attempt:
|
|
||||||
# 1. Captcha iframe found (in check_and_wait_for_captcha)
|
|
||||||
# 2. Phone verification not found (in handle_after_login_logic)
|
|
||||||
# 3. Email verification not found (in handle_after_login_logic)
|
|
||||||
# 4. GDPR banner not found (in handle_after_login_logic)
|
|
||||||
# Second login attempt:
|
|
||||||
# 5. Captcha iframe found (in check_and_wait_for_captcha)
|
|
||||||
# 6. Phone verification not found (in handle_after_login_logic)
|
|
||||||
# 7. Email verification not found (in handle_after_login_logic)
|
|
||||||
# 8. GDPR banner not found (in handle_after_login_logic)
|
|
||||||
mock_find.side_effect = [
|
|
||||||
TimeoutError(), # Consent banner (before login state check)
|
|
||||||
AsyncMock(), # Captcha iframe (first login)
|
|
||||||
TimeoutError(), # Phone verification (first login)
|
|
||||||
TimeoutError(), # Email verification (first login)
|
|
||||||
TimeoutError(), # GDPR banner (first login)
|
|
||||||
AsyncMock(), # Captcha iframe (second login)
|
|
||||||
TimeoutError(), # Phone verification (second login)
|
|
||||||
TimeoutError(), # Email verification (second login)
|
|
||||||
TimeoutError(), # GDPR banner (second login)
|
|
||||||
]
|
|
||||||
mock_ainput.return_value = ""
|
|
||||||
mock_input.return_value = AsyncMock()
|
|
||||||
mock_click.return_value = AsyncMock()
|
|
||||||
|
|
||||||
await test_bot.login()
|
await test_bot.login()
|
||||||
|
|
||||||
# Verify the complete flow
|
mock_open.assert_awaited_once()
|
||||||
assert mock_find.call_count == 9 # 1 consent banner + 8 original web_find calls
|
assert mock_open.await_args is not None
|
||||||
assert mock_ainput.call_count == 2 # Two captcha prompts
|
assert mock_open.await_args.args[0] == test_bot.root_url
|
||||||
assert mock_input.call_count == 6 # Two login attempts with username, clear password, and set password
|
mock_state.assert_awaited_once()
|
||||||
assert mock_click.call_count == 2 # Two submit button clicks
|
mock_fill.assert_not_called()
|
||||||
|
mock_after_login.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_login_flow_raises_when_state_remains_unknown(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Post-login UNKNOWN state should fail fast with diagnostics."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_open"),
|
||||||
|
patch.object(test_bot, "get_login_state", new_callable = AsyncMock, side_effect = [LoginState.LOGGED_OUT, LoginState.UNKNOWN]) as mock_state,
|
||||||
|
patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "fill_login_data_and_send", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "handle_after_login_logic", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_dismiss_consent_banner", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_capture_login_detection_diagnostics_if_enabled", new_callable = AsyncMock) as mock_diagnostics,
|
||||||
|
):
|
||||||
|
with pytest.raises(AssertionError, match = "Login could not be confirmed"):
|
||||||
|
await test_bot.login()
|
||||||
|
|
||||||
|
mock_diagnostics.assert_awaited_once()
|
||||||
|
mock_state.assert_awaited()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_login_flow_raises_when_sso_navigation_times_out(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""SSO navigation timeout should trigger diagnostics and re-raise."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_open", new_callable = AsyncMock, side_effect = [None, TimeoutError("sso timeout")]),
|
||||||
|
patch.object(test_bot, "get_login_state", new_callable = AsyncMock, return_value = LoginState.LOGGED_OUT) as mock_state,
|
||||||
|
patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_capture_login_detection_diagnostics_if_enabled", new_callable = AsyncMock) as mock_diagnostics,
|
||||||
|
):
|
||||||
|
with pytest.raises(TimeoutError, match = "sso timeout"):
|
||||||
|
await test_bot.login()
|
||||||
|
|
||||||
|
mock_diagnostics.assert_awaited_once()
|
||||||
|
mock_state.assert_awaited_once()
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_check_and_wait_for_captcha(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_check_and_wait_for_captcha(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
@@ -764,62 +846,142 @@ class TestKleinanzeigenBotAuthentication:
|
|||||||
async def test_fill_login_data_and_send(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_fill_login_data_and_send(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify that login form filling works correctly."""
|
"""Verify that login form filling works correctly."""
|
||||||
with (
|
with (
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock) as wait_context,
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock) as wait_password,
|
||||||
|
patch.object(test_bot, "_wait_for_post_auth0_submit_transition", new_callable = AsyncMock) as wait_transition,
|
||||||
patch.object(test_bot, "web_input") as mock_input,
|
patch.object(test_bot, "web_input") as mock_input,
|
||||||
patch.object(test_bot, "web_click") as mock_click,
|
patch.object(test_bot, "web_click") as mock_click,
|
||||||
patch.object(test_bot, "check_and_wait_for_captcha", new_callable = AsyncMock) as mock_captcha,
|
patch.object(test_bot, "check_and_wait_for_captcha", new_callable = AsyncMock) as mock_captcha,
|
||||||
):
|
):
|
||||||
# Mock successful login form interaction
|
|
||||||
mock_input.return_value = AsyncMock()
|
|
||||||
mock_click.return_value = AsyncMock()
|
|
||||||
|
|
||||||
await test_bot.fill_login_data_and_send()
|
await test_bot.fill_login_data_and_send()
|
||||||
|
|
||||||
|
wait_context.assert_awaited_once()
|
||||||
|
wait_password.assert_awaited_once()
|
||||||
|
wait_transition.assert_awaited_once()
|
||||||
assert mock_captcha.call_count == 1
|
assert mock_captcha.call_count == 1
|
||||||
assert mock_input.call_count == 3 # Username, clear password, set password
|
assert mock_input.call_count == 2
|
||||||
assert mock_click.call_count == 1 # Submit button
|
assert mock_click.call_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fill_login_data_and_send_logs_generic_start_message(
|
||||||
|
self, test_bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture
|
||||||
|
) -> None:
|
||||||
|
with (
|
||||||
|
caplog.at_level("INFO"),
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_wait_for_post_auth0_submit_transition", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "web_input"),
|
||||||
|
patch.object(test_bot, "web_click"),
|
||||||
|
patch.object(test_bot, "check_and_wait_for_captcha", new_callable = AsyncMock),
|
||||||
|
):
|
||||||
|
await test_bot.fill_login_data_and_send()
|
||||||
|
|
||||||
|
assert "Logging in..." in caplog.text
|
||||||
|
assert test_bot.config.login.username not in caplog.text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fill_login_data_and_send_fails_when_password_step_missing(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Missing Auth0 password step should fail fast."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_login_context", new_callable = AsyncMock),
|
||||||
|
patch.object(test_bot, "_wait_for_auth0_password_step", new_callable = AsyncMock, side_effect = AssertionError("missing password")),
|
||||||
|
patch.object(test_bot, "web_input") as mock_input,
|
||||||
|
patch.object(test_bot, "web_click") as mock_click,
|
||||||
|
):
|
||||||
|
with pytest.raises(AssertionError, match = "missing password"):
|
||||||
|
await test_bot.fill_login_data_and_send()
|
||||||
|
|
||||||
|
assert mock_input.call_count == 1
|
||||||
|
assert mock_click.call_count == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wait_for_post_auth0_submit_transition_url_branch(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""URL transition success should return without fallback checks."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_await", new_callable = AsyncMock, return_value = True) as mock_wait,
|
||||||
|
patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep,
|
||||||
|
):
|
||||||
|
await test_bot._wait_for_post_auth0_submit_transition()
|
||||||
|
|
||||||
|
mock_wait.assert_awaited_once()
|
||||||
|
mock_sleep.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wait_for_post_auth0_submit_transition_dom_fallback_branch(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""DOM fallback should run when URL transition is inconclusive."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait,
|
||||||
|
patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, return_value = True) as mock_is_logged_in,
|
||||||
|
patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep,
|
||||||
|
):
|
||||||
|
await test_bot._wait_for_post_auth0_submit_transition()
|
||||||
|
|
||||||
|
mock_wait.assert_awaited_once()
|
||||||
|
mock_is_logged_in.assert_awaited_once()
|
||||||
|
mock_sleep.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wait_for_post_auth0_submit_transition_sleep_fallback_branch(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Sleep fallback should run when bounded login check times out."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait,
|
||||||
|
patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, side_effect = asyncio.TimeoutError) as mock_is_logged_in,
|
||||||
|
patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep,
|
||||||
|
):
|
||||||
|
with pytest.raises(TimeoutError, match = "Auth0 post-submit verification remained inconclusive"):
|
||||||
|
await test_bot._wait_for_post_auth0_submit_transition()
|
||||||
|
|
||||||
|
mock_wait.assert_awaited_once()
|
||||||
|
assert mock_is_logged_in.await_count == 2
|
||||||
|
mock_sleep.assert_awaited_once()
|
||||||
|
assert mock_sleep.await_args is not None
|
||||||
|
sleep_kwargs = cast(Any, mock_sleep.await_args).kwargs
|
||||||
|
assert sleep_kwargs["min_ms"] < sleep_kwargs["max_ms"]
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_wait_for_post_auth0_submit_transition_sleep_fallback_when_login_not_confirmed(
|
||||||
|
self, test_bot:KleinanzeigenBot
|
||||||
|
) -> None:
|
||||||
|
"""Sleep fallback should run when bounded login check returns False."""
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_await", new_callable = AsyncMock, side_effect = [TimeoutError()]) as mock_wait,
|
||||||
|
patch.object(test_bot, "is_logged_in", new_callable = AsyncMock, return_value = False) as mock_is_logged_in,
|
||||||
|
patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as mock_sleep,
|
||||||
|
):
|
||||||
|
with pytest.raises(TimeoutError, match = "Auth0 post-submit verification remained inconclusive"):
|
||||||
|
await test_bot._wait_for_post_auth0_submit_transition()
|
||||||
|
|
||||||
|
mock_wait.assert_awaited_once()
|
||||||
|
assert mock_is_logged_in.await_count == 2
|
||||||
|
mock_sleep.assert_awaited_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_click_gdpr_banner_uses_quick_dom_timeout_and_passes_click_timeout(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "_timeout", return_value = 1.25) as mock_timeout,
|
||||||
|
patch.object(test_bot, "web_find", new_callable = AsyncMock) as mock_find,
|
||||||
|
patch.object(test_bot, "web_click", new_callable = AsyncMock) as mock_click,
|
||||||
|
):
|
||||||
|
await test_bot._click_gdpr_banner()
|
||||||
|
|
||||||
|
mock_timeout.assert_called_once_with("quick_dom")
|
||||||
|
mock_find.assert_awaited_once_with(By.ID, "gdpr-banner-accept", timeout = 1.25)
|
||||||
|
mock_click.assert_awaited_once_with(By.ID, "gdpr-banner-accept", timeout = 1.25)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_handle_after_login_logic(self, test_bot:KleinanzeigenBot) -> None:
|
async def test_handle_after_login_logic(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Verify that post-login handling works correctly."""
|
"""Verify that post-login handling works correctly."""
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_find") as mock_find,
|
patch.object(test_bot, "_check_sms_verification", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_sms,
|
||||||
patch.object(test_bot, "web_click") as mock_click,
|
patch.object(test_bot, "_check_email_verification", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_email,
|
||||||
patch("kleinanzeigen_bot.ainput", new_callable = AsyncMock) as mock_ainput,
|
patch.object(test_bot, "_click_gdpr_banner", new_callable = AsyncMock, side_effect = TimeoutError()) as mock_gdpr,
|
||||||
):
|
):
|
||||||
# Test case 1: No special handling needed
|
|
||||||
mock_find.side_effect = [TimeoutError(), TimeoutError(), TimeoutError()] # No phone verification, no email verification, no GDPR
|
|
||||||
mock_click.return_value = AsyncMock()
|
|
||||||
mock_ainput.return_value = ""
|
|
||||||
|
|
||||||
await test_bot.handle_after_login_logic()
|
await test_bot.handle_after_login_logic()
|
||||||
|
|
||||||
assert mock_find.call_count == 3
|
mock_sms.assert_awaited_once()
|
||||||
assert mock_click.call_count == 0
|
mock_email.assert_awaited_once()
|
||||||
assert mock_ainput.call_count == 0
|
mock_gdpr.assert_awaited_once()
|
||||||
|
|
||||||
# Test case 2: Phone verification needed
|
|
||||||
mock_find.reset_mock()
|
|
||||||
mock_click.reset_mock()
|
|
||||||
mock_ainput.reset_mock()
|
|
||||||
mock_find.side_effect = [AsyncMock(), TimeoutError(), TimeoutError()] # Phone verification found, no email verification, no GDPR
|
|
||||||
|
|
||||||
await test_bot.handle_after_login_logic()
|
|
||||||
|
|
||||||
assert mock_find.call_count == 3
|
|
||||||
assert mock_click.call_count == 0 # No click needed, just wait for user
|
|
||||||
assert mock_ainput.call_count == 1 # Wait for user to complete verification
|
|
||||||
|
|
||||||
# Test case 3: GDPR banner present
|
|
||||||
mock_find.reset_mock()
|
|
||||||
mock_click.reset_mock()
|
|
||||||
mock_ainput.reset_mock()
|
|
||||||
mock_find.side_effect = [TimeoutError(), TimeoutError(), AsyncMock()] # No phone verification, no email verification, GDPR found
|
|
||||||
|
|
||||||
await test_bot.handle_after_login_logic()
|
|
||||||
|
|
||||||
assert mock_find.call_count == 3
|
|
||||||
assert mock_click.call_count == 2 # Click to accept GDPR and continue
|
|
||||||
assert mock_ainput.call_count == 0
|
|
||||||
|
|
||||||
|
|
||||||
class TestKleinanzeigenBotDiagnostics:
|
class TestKleinanzeigenBotDiagnostics:
|
||||||
@@ -866,9 +1028,10 @@ class TestKleinanzeigenBotDiagnostics:
|
|||||||
ad_cfg = Ad.model_validate(diagnostics_ad_config)
|
ad_cfg = Ad.model_validate(diagnostics_ad_config)
|
||||||
ad_cfg_orig = copy.deepcopy(diagnostics_ad_config)
|
ad_cfg_orig = copy.deepcopy(diagnostics_ad_config)
|
||||||
ad_file = str(tmp_path / "ad_000001_Test.yml")
|
ad_file = str(tmp_path / "ad_000001_Test.yml")
|
||||||
|
ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})}
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = {"content": json.dumps({"ads": []})}),
|
patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response),
|
||||||
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")),
|
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")),
|
||||||
):
|
):
|
||||||
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
||||||
@@ -907,9 +1070,10 @@ class TestKleinanzeigenBotDiagnostics:
|
|||||||
ad_cfg = Ad.model_validate(diagnostics_ad_config)
|
ad_cfg = Ad.model_validate(diagnostics_ad_config)
|
||||||
ad_cfg_orig = copy.deepcopy(diagnostics_ad_config)
|
ad_cfg_orig = copy.deepcopy(diagnostics_ad_config)
|
||||||
ad_file = str(tmp_path / "ad_000001_Test.yml")
|
ad_file = str(tmp_path / "ad_000001_Test.yml")
|
||||||
|
ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})}
|
||||||
|
|
||||||
with (
|
with (
|
||||||
patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = {"content": json.dumps({"ads": []})}),
|
patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response),
|
||||||
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")),
|
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("boom")),
|
||||||
):
|
):
|
||||||
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
||||||
@@ -1015,6 +1179,35 @@ class TestKleinanzeigenBotBasics:
|
|||||||
web_await_mock.assert_awaited_once()
|
web_await_mock.assert_awaited_once()
|
||||||
delete_ad_mock.assert_awaited_once_with(ad_cfgs[0][1], [], delete_old_ads_by_title = False)
|
delete_ad_mock.assert_awaited_once_with(ad_cfgs[0][1], [], delete_old_ads_by_title = False)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_ads_uses_millisecond_retry_delay_on_retryable_failure(
|
||||||
|
self,
|
||||||
|
test_bot:KleinanzeigenBot,
|
||||||
|
base_ad_config:dict[str, Any],
|
||||||
|
mock_page:MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Retry branch should sleep with explicit millisecond delay."""
|
||||||
|
test_bot.page = mock_page
|
||||||
|
test_bot.keep_old_ads = True
|
||||||
|
|
||||||
|
ad_cfg = Ad.model_validate(base_ad_config)
|
||||||
|
ad_cfg_orig = copy.deepcopy(base_ad_config)
|
||||||
|
ad_file = "ad.yaml"
|
||||||
|
ads_response = {"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})}
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_request", new_callable = AsyncMock, return_value = ads_response),
|
||||||
|
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = [TimeoutError("transient"), None]) as publish_mock,
|
||||||
|
patch.object(test_bot, "_detect_new_published_ad_ids", new_callable = AsyncMock, return_value = set()) as detect_mock,
|
||||||
|
patch.object(test_bot, "web_sleep", new_callable = AsyncMock) as sleep_mock,
|
||||||
|
patch.object(test_bot, "web_await", new_callable = AsyncMock, return_value = True),
|
||||||
|
):
|
||||||
|
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
||||||
|
|
||||||
|
assert publish_mock.await_count == 2
|
||||||
|
detect_mock.assert_awaited_once()
|
||||||
|
sleep_mock.assert_awaited_once_with(2_000)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_publish_ads_aborts_retry_on_duplicate_detection(
|
async def test_publish_ads_aborts_retry_on_duplicate_detection(
|
||||||
self,
|
self,
|
||||||
@@ -1047,6 +1240,62 @@ class TestKleinanzeigenBotBasics:
|
|||||||
# publish_ad should have been called only once — retry was aborted due to duplicate detection
|
# publish_ad should have been called only once — retry was aborted due to duplicate detection
|
||||||
assert publish_mock.await_count == 1
|
assert publish_mock.await_count == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_ads_aborts_retry_when_duplicate_verification_fetch_is_malformed(
|
||||||
|
self,
|
||||||
|
test_bot:KleinanzeigenBot,
|
||||||
|
base_ad_config:dict[str, Any],
|
||||||
|
mock_page:MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Retry verification must fail closed on malformed published-ads responses."""
|
||||||
|
test_bot.page = mock_page
|
||||||
|
|
||||||
|
ad_cfg = Ad.model_validate(base_ad_config)
|
||||||
|
ad_cfg_orig = copy.deepcopy(base_ad_config)
|
||||||
|
ad_file = "ad.yaml"
|
||||||
|
|
||||||
|
fetch_responses = [
|
||||||
|
{"content": json.dumps({"ads": []})},
|
||||||
|
{"content": json.dumps({"ads": []})},
|
||||||
|
[],
|
||||||
|
]
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_request", new_callable = AsyncMock, side_effect = fetch_responses),
|
||||||
|
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("image upload timeout")) as publish_mock,
|
||||||
|
):
|
||||||
|
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
||||||
|
|
||||||
|
assert publish_mock.await_count == 1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_publish_ads_aborts_retry_when_duplicate_verification_ads_entries_are_malformed(
|
||||||
|
self,
|
||||||
|
test_bot:KleinanzeigenBot,
|
||||||
|
base_ad_config:dict[str, Any],
|
||||||
|
mock_page:MagicMock,
|
||||||
|
) -> None:
|
||||||
|
"""Retry verification must fail closed when strict fetch returns non-dict ad entries."""
|
||||||
|
test_bot.page = mock_page
|
||||||
|
|
||||||
|
ad_cfg = Ad.model_validate(base_ad_config)
|
||||||
|
ad_cfg_orig = copy.deepcopy(base_ad_config)
|
||||||
|
ad_file = "ad.yaml"
|
||||||
|
|
||||||
|
fetch_responses = [
|
||||||
|
{"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})},
|
||||||
|
{"content": json.dumps({"ads": [], "paging": {"pageNum": 1, "last": 1}})},
|
||||||
|
{"content": json.dumps({"ads": [42], "paging": {"pageNum": 1, "last": 1}})},
|
||||||
|
]
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(test_bot, "web_request", new_callable = AsyncMock, side_effect = fetch_responses),
|
||||||
|
patch.object(test_bot, "publish_ad", new_callable = AsyncMock, side_effect = TimeoutError("image upload timeout")) as publish_mock,
|
||||||
|
):
|
||||||
|
await test_bot.publish_ads([(ad_file, ad_cfg, ad_cfg_orig)])
|
||||||
|
|
||||||
|
assert publish_mock.await_count == 1
|
||||||
|
|
||||||
def test_get_root_url(self, test_bot:KleinanzeigenBot) -> None:
|
def test_get_root_url(self, test_bot:KleinanzeigenBot) -> None:
|
||||||
"""Test root URL retrieval."""
|
"""Test root URL retrieval."""
|
||||||
assert test_bot.root_url == "https://www.kleinanzeigen.de"
|
assert test_bot.root_url == "https://www.kleinanzeigen.de"
|
||||||
|
|||||||
@@ -187,6 +187,17 @@ class TestJSONPagination:
|
|||||||
pytest.fail(f"expected 2 ads, got {len(result)}")
|
pytest.fail(f"expected 2 ads, got {len(result)}")
|
||||||
mock_request.assert_awaited_once()
|
mock_request.assert_awaited_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_published_ads_strict_raises_on_missing_paging_dict(self, bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Strict mode should fail closed when paging metadata is missing."""
|
||||||
|
response_data = {"ads": [{"id": 1}, {"id": 2}]}
|
||||||
|
|
||||||
|
with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request:
|
||||||
|
mock_request.return_value = {"content": json.dumps(response_data)}
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match = "Missing or invalid paging info on page 1: NoneType"):
|
||||||
|
await bot._fetch_published_ads(strict = True)
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_fetch_published_ads_non_integer_paging_values(self, bot:KleinanzeigenBot) -> None:
|
async def test_fetch_published_ads_non_integer_paging_values(self, bot:KleinanzeigenBot) -> None:
|
||||||
"""Test handling of non-integer paging values."""
|
"""Test handling of non-integer paging values."""
|
||||||
@@ -219,6 +230,33 @@ class TestJSONPagination:
|
|||||||
if len(result) != 0:
|
if len(result) != 0:
|
||||||
pytest.fail(f"expected empty list when 'ads' is not a list, got: {result}")
|
pytest.fail(f"expected empty list when 'ads' is not a list, got: {result}")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_published_ads_strict_rejects_non_dict_entries(self, bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Strict mode should reject malformed entries inside ads list."""
|
||||||
|
response_data = {"ads": [42, {"id": 1}], "paging": {"pageNum": 1, "last": 1}}
|
||||||
|
|
||||||
|
with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request:
|
||||||
|
mock_request.return_value = {"content": json.dumps(response_data)}
|
||||||
|
|
||||||
|
with pytest.raises(TypeError, match = "Unexpected ad entry type on page 1: int"):
|
||||||
|
await bot._fetch_published_ads(strict = True)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_published_ads_non_strict_filters_non_dict_entries(self, bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture) -> None:
|
||||||
|
"""Non-strict mode should filter malformed entries and continue."""
|
||||||
|
response_data = {"ads": [42, {"id": 1}, "broken"], "paging": {"pageNum": 1, "last": 1}}
|
||||||
|
|
||||||
|
with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request:
|
||||||
|
mock_request.return_value = {"content": json.dumps(response_data)}
|
||||||
|
|
||||||
|
with caplog.at_level("WARNING"):
|
||||||
|
result = await bot._fetch_published_ads(strict = False)
|
||||||
|
|
||||||
|
if result != [{"id": 1}]:
|
||||||
|
pytest.fail(f"expected malformed entries to be filtered out, got: {result}")
|
||||||
|
if "Filtered 2 malformed ad entries on page 1" not in caplog.text:
|
||||||
|
pytest.fail(f"expected malformed-entry warning in logs, got: {caplog.text}")
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_fetch_published_ads_timeout(self, bot:KleinanzeigenBot) -> None:
|
async def test_fetch_published_ads_timeout(self, bot:KleinanzeigenBot) -> None:
|
||||||
"""Test handling of timeout during pagination."""
|
"""Test handling of timeout during pagination."""
|
||||||
@@ -229,3 +267,26 @@ class TestJSONPagination:
|
|||||||
|
|
||||||
if result != []:
|
if result != []:
|
||||||
pytest.fail(f"Expected empty list on timeout, got {result}")
|
pytest.fail(f"Expected empty list on timeout, got {result}")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_published_ads_non_strict_handles_non_string_content_type(self, bot:KleinanzeigenBot, caplog:pytest.LogCaptureFixture) -> None:
|
||||||
|
"""Non-strict mode should gracefully stop on unexpected non-string content types."""
|
||||||
|
with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request:
|
||||||
|
mock_request.return_value = {"content": None}
|
||||||
|
|
||||||
|
with caplog.at_level("WARNING"):
|
||||||
|
result = await bot._fetch_published_ads(strict = False)
|
||||||
|
|
||||||
|
if result != []:
|
||||||
|
pytest.fail(f"expected empty result on non-string content in non-strict mode, got: {result}")
|
||||||
|
if "Unexpected response content type on page 1: NoneType" not in caplog.text:
|
||||||
|
pytest.fail(f"expected non-string content warning in logs, got: {caplog.text}")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fetch_published_ads_strict_raises_on_non_string_content_type(self, bot:KleinanzeigenBot) -> None:
|
||||||
|
"""Strict mode should fail closed on unexpected non-string content types."""
|
||||||
|
with patch.object(bot, "web_request", new_callable = AsyncMock) as mock_request:
|
||||||
|
mock_request.return_value = {"content": None}
|
||||||
|
|
||||||
|
with pytest.raises(TypeError, match = "Unexpected response content type on page 1: NoneType"):
|
||||||
|
await bot._fetch_published_ads(strict = True)
|
||||||
|
|||||||
Reference in New Issue
Block a user