fix: auth probe + diagnostics for UNKNOWN states (#791)

This commit is contained in:
Jens
2026-01-28 06:08:45 +01:00
committed by GitHub
parent 7098719d5b
commit b4cb979164
8 changed files with 501 additions and 85 deletions

View File

@@ -1,7 +1,7 @@
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import atexit, asyncio, enum, json, os, re, signal, sys, textwrap # isort: skip
import atexit, asyncio, enum, json, os, re, secrets, signal, sys, textwrap # isort: skip
import getopt # pylint: disable=deprecated-module
import urllib.parse as urllib_parse
from datetime import datetime
@@ -39,6 +39,12 @@ class AdUpdateStrategy(enum.Enum):
MODIFY = enum.auto()
class LoginState(enum.Enum):
LOGGED_IN = enum.auto()
LOGGED_OUT = enum.auto()
UNKNOWN = enum.auto()
def _repost_cycle_ready(ad_cfg:Ad, ad_file_relative:str) -> bool:
"""
Check if the repost cycle delay has been satisfied.
@@ -55,7 +61,7 @@ def _repost_cycle_ready(ad_cfg:Ad, ad_file_relative:str) -> bool:
if total_reposts <= delay_reposts:
remaining = (delay_reposts + 1) - total_reposts
LOG.info(
"Auto price reduction delayed for [%s]: waiting %s more reposts (completed %s, applied %s reductions)",
_("Auto price reduction delayed for [%s]: waiting %s more reposts (completed %s, applied %s reductions)"),
ad_file_relative,
max(remaining, 1), # Clamp to 1 to avoid showing "0 more reposts" when at threshold
total_reposts,
@@ -64,7 +70,9 @@ def _repost_cycle_ready(ad_cfg:Ad, ad_file_relative:str) -> bool:
return False
if eligible_cycles <= applied_cycles:
LOG.debug("Auto price reduction already applied for [%s]: %s reductions match %s eligible reposts", ad_file_relative, applied_cycles, eligible_cycles)
LOG.debug(
_("Auto price reduction already applied for [%s]: %s reductions match %s eligible reposts"), ad_file_relative, applied_cycles, eligible_cycles
)
return False
return True
@@ -175,6 +183,8 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
self.ads_selector = "due"
self.keep_old_ads = False
self._login_detection_diagnostics_captured:bool = False
def __del__(self) -> None:
if self.file_log:
self.file_log.close()
@@ -802,10 +812,15 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
if getattr(self, "page", None) is not None:
LOG.debug("Current page URL after opening homepage: %s", self.page.url)
if await self.is_logged_in():
state = await self.get_login_state()
if state == LoginState.LOGGED_IN:
LOG.info("Already logged in as [%s]. Skipping login.", self.config.login.username)
return
if state == LoginState.UNKNOWN:
LOG.warning("Login state is UNKNOWN - cannot determine if already logged in. Skipping login attempt.")
return
LOG.info("Opening login page...")
await self.web_open(f"{self.root_url}/m-einloggen.html?targetUrl=/")
@@ -813,12 +828,18 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
await self.handle_after_login_logic()
# Sometimes a second login is required
if not await self.is_logged_in():
state = await self.get_login_state()
if state == LoginState.UNKNOWN:
LOG.warning("Login state is UNKNOWN after first login attempt - cannot determine login status. Aborting login process.")
return
if state == LoginState.LOGGED_OUT:
LOG.debug("First login attempt did not succeed, trying second login attempt")
await self.fill_login_data_and_send()
await self.handle_after_login_logic()
if await self.is_logged_in():
state = await self.get_login_state()
if state == LoginState.LOGGED_IN:
LOG.debug("Second login attempt succeeded")
else:
LOG.warning("Second login attempt also failed - login may not have succeeded")
@@ -859,7 +880,120 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
# GDPR banner not shown within timeout.
pass
async def is_logged_in(self) -> bool:
async def _auth_probe_login_state(self) -> LoginState:
"""Probe an auth-required endpoint to classify login state.
The probe is non-mutating (GET request). It is used as a primary method by
get_login_state() to classify login state, falling back to DOM checks only when
the probe returns UNKNOWN.
"""
url = f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"
try:
response = await self.web_request(url, valid_response_codes = [200, 401, 403])
except (TimeoutError, AssertionError):
# AssertionError can occur when web_request() fails to parse the response (e.g., unexpected content type)
# Treat both timeout and assertion failures as UNKNOWN to avoid false assumptions about login state
return LoginState.UNKNOWN
status_code = response.get("statusCode")
if status_code in {401, 403}:
return LoginState.LOGGED_OUT
content = response.get("content", "")
if not isinstance(content, str):
return LoginState.UNKNOWN
try:
payload = json.loads(content)
except json.JSONDecodeError:
lowered = content.lower()
if "m-einloggen" in lowered or "login-email" in lowered or "login-password" in lowered or "login-form" in lowered:
return LoginState.LOGGED_OUT
return LoginState.UNKNOWN
if isinstance(payload, dict) and "ads" in payload:
return LoginState.LOGGED_IN
return LoginState.UNKNOWN
async def get_login_state(self) -> LoginState:
"""Determine current login state using layered detection.
Order:
1) Server-side auth probe via `_auth_probe_login_state` (preferred)
2) DOM-based check via `is_logged_in(include_probe=False)`
3) If still inconclusive, capture diagnostics via
`_capture_login_detection_diagnostics_if_enabled` and return `UNKNOWN`
"""
# Prefer the deterministic, server-side auth probe first.
# SPA/hydration delays can cause DOM-based checks to temporarily miss login indicators.
state = await self._auth_probe_login_state()
if state != LoginState.UNKNOWN:
return state
# Fall back to DOM-based checks only when the probe is inconclusive.
if await self.is_logged_in(include_probe = False):
return LoginState.LOGGED_IN
await self._capture_login_detection_diagnostics_if_enabled()
return LoginState.UNKNOWN
def _diagnostics_output_dir(self) -> Path:
diagnostics = getattr(self.config, "diagnostics", None)
if diagnostics is not None and diagnostics.output_dir and diagnostics.output_dir.strip():
return Path(abspath(diagnostics.output_dir, relative_to = self.config_file_path)).resolve()
if self.installation_mode_or_portable == "xdg":
return xdg_paths.get_xdg_base_dir("cache") / "diagnostics"
return (Path.cwd() / ".temp" / "diagnostics").resolve()
async def _capture_login_detection_diagnostics_if_enabled(self) -> None:
diagnostics = getattr(self.config, "diagnostics", None)
if diagnostics is None or not diagnostics.login_detection_capture:
return
if self._login_detection_diagnostics_captured:
return
page = getattr(self, "page", None)
if page is None:
return
self._login_detection_diagnostics_captured = True
try:
out_dir = self._diagnostics_output_dir()
out_dir.mkdir(parents = True, exist_ok = True)
# Intentionally no username/PII in filename.
ts = misc.now().strftime("%Y%m%dT%H%M%S")
suffix = secrets.token_hex(4)
base = f"login_detection_unknown_{ts}_{suffix}"
screenshot_path = out_dir / f"{base}.png"
html_path = out_dir / f"{base}.html"
try:
await page.save_screenshot(str(screenshot_path))
except Exception as exc: # noqa: BLE001
LOG.debug("Login diagnostics screenshot capture failed: %s", exc)
try:
html = await page.get_content()
html_path.write_text(html, encoding = "utf-8")
except Exception as exc: # noqa: BLE001
LOG.debug("Login diagnostics HTML capture failed: %s", exc)
except Exception as exc: # noqa: BLE001
LOG.debug("Login diagnostics capture failed: %s", exc)
if getattr(diagnostics, "pause_on_login_detection_failure", False) and getattr(sys.stdin, "isatty", lambda: False)():
LOG.warning("############################################")
LOG.warning("# Login detection returned UNKNOWN. Browser is paused for manual inspection.")
LOG.warning("############################################")
await ainput(_("Press a key to continue..."))
async def is_logged_in(self, *, include_probe:bool = True) -> bool:
# Use login_detection timeout (10s default) instead of default (5s)
# to allow sufficient time for client-side JavaScript rendering after page load.
# This is especially important for older sessions (20+ days) that require
@@ -867,7 +1001,11 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
login_check_timeout = self._timeout("login_detection")
effective_timeout = self._effective_timeout("login_detection")
username = self.config.login.username.lower()
LOG.debug("Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)", login_check_timeout, effective_timeout)
LOG.debug(
"Starting login detection (timeout: %.1fs base, %.1fs effective with multiplier/backoff)",
login_check_timeout,
effective_timeout,
)
# Try to find the standard element first
try:
@@ -887,7 +1025,15 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
except TimeoutError:
LOG.debug("Timeout waiting for #user-email element after %.1fs", effective_timeout)
LOG.debug("No login detected - neither .mr-medium nor #user-email found with username")
if not include_probe:
LOG.debug("No login detected - neither .mr-medium nor #user-email found with username")
return False
state = await self._auth_probe_login_state()
if state == LoginState.LOGGED_IN:
return True
LOG.debug("No login detected - DOM elements not found and server probe returned %s", state.name)
return False
async def delete_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None:
@@ -1384,7 +1530,10 @@ class KleinanzeigenBot(WebScrapingMixin): # noqa: PLR0904
await self.web_input(By.ID, "postad-phonenumber", contact.phone)
except TimeoutError:
LOG.warning(
"Phone number field not present on page. This is expected for many private accounts; commercial accounts may still support phone numbers."
_(
"Phone number field not present on page. This is expected for many private accounts; "
"commercial accounts may still support phone numbers."
)
)
async def update_ads(self, ad_cfgs:list[tuple[str, Ad, dict[str, Any]]]) -> None: