# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404
from collections.abc import Awaitable, Callable, Coroutine, Iterable, Sequence
from gettext import gettext as _
from pathlib import Path, PureWindowsPath
from typing import Any, Final, Optional, cast
try:
from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy
except ImportError:
from typing import NoReturn as Never # Python <3.11
import nodriver, psutil # isort: skip
from typing import TYPE_CHECKING, TypeGuard
from nodriver.core.browser import Browser
from nodriver.core.config import Config as NodriverConfig
from nodriver.core.element import Element
from nodriver.core.tab import Tab as Page
from kleinanzeigen_bot.model.config_model import Config as BotConfig
from kleinanzeigen_bot.model.config_model import TimeoutConfig
from . import files, loggers, net, xdg_paths
from .chrome_version_detector import (
ChromeVersionInfo,
detect_chrome_version_from_binary,
detect_chrome_version_from_remote_debugging,
get_chrome_version_diagnostic_info,
validate_chrome_136_configuration,
)
from .misc import T, ensure
if TYPE_CHECKING:
from nodriver.cdp.runtime import RemoteObject
# Constants for RemoteObject conversion
_KEY_VALUE_PAIR_SIZE = 2
_PRIMARY_SELECTOR_BUDGET_RATIO:Final[float] = 0.70
_BACKUP_SELECTOR_BUDGET_CAP_SECONDS:Final[float] = 0.75
_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS:Final[float] = 0.25
def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]:
"""Resolve the argument and config user_data_dir paths for comparison."""
try:
return (
Path(arg_value).expanduser().resolve(),
Path(config_value).expanduser().resolve(),
)
except OSError as exc:
LOG.debug("Failed to resolve user_data_dir paths for comparison: %s", exc)
return None, None
def _has_non_empty_user_data_dir_arg(args:Iterable[str]) -> bool:
for arg in args:
if not arg.startswith("--user-data-dir="):
continue
raw = arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
if raw:
return True
return False
def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]:
"""Type guard to check if an object is a RemoteObject."""
return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj))
__all__ = [
"Browser",
"BrowserConfig",
"By",
"Element",
"Page",
"Is",
"WebScrapingMixin",
]
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
# see https://api.jquery.com/category/selectors/
METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in "!\"#$%&'()*+,./:;<=>?@[\\]^`{|}~"})
def _is_admin() -> bool:
"""Check if the current process is running with admin/root privileges."""
try:
if hasattr(os, "geteuid"):
result = os.geteuid() == 0
return bool(result)
return False
except AttributeError:
return False
class By(enum.Enum):
ID = enum.auto()
CLASS_NAME = enum.auto()
CSS_SELECTOR = enum.auto()
TAG_NAME = enum.auto()
TEXT = enum.auto()
XPATH = enum.auto()
class Is(enum.Enum):
CLICKABLE = enum.auto()
DISPLAYED = enum.auto()
DISABLED = enum.auto()
READONLY = enum.auto()
SELECTED = enum.auto()
class BrowserConfig:
def __init__(self) -> None:
self.arguments:Iterable[str] = []
self.binary_location:str | None = None
self.extensions:Iterable[str] = []
self.use_private_window:bool = True
self.user_data_dir:str | None = None
self.profile_name:str | None = None
def _write_initial_prefs(prefs_file:str) -> None:
with open(prefs_file, "w", encoding = "UTF-8") as fd:
json.dump(
{
"credentials_enable_service": False,
"enable_do_not_track": True,
"google": {"services": {"consented_to_sync": False}},
"profile": {
"default_content_setting_values": {
"popups": 0,
"notifications": 2, # 1 = allow, 2 = block browser notifications
},
"password_manager_enabled": False,
},
"signin": {"allowed": False},
"translate_site_blacklist": ["www.kleinanzeigen.de"],
"devtools": {"preferences": {"currentDockState": '"bottom"'}},
},
fd,
)
class WebScrapingMixin:
def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue]
self.page:Page = None # pyright: ignore[reportAttributeAccessIssue]
self._default_timeout_config:TimeoutConfig | None = None
self.config:BotConfig = cast(BotConfig, None)
def _get_timeout_config(self) -> TimeoutConfig:
config = getattr(self, "config", None)
timeouts:TimeoutConfig | None = None
if config is not None:
timeouts = cast(Optional[TimeoutConfig], getattr(config, "timeouts", None))
if timeouts is not None:
return timeouts
if self._default_timeout_config is None:
self._default_timeout_config = TimeoutConfig()
return self._default_timeout_config
def _timeout(self, key:str = "default", override:float | None = None) -> float:
"""
Return the base timeout (seconds) for a given key without applying multipliers.
"""
return self._get_timeout_config().resolve(key, override)
def _effective_timeout(self, key:str = "default", override:float | None = None, *, attempt:int = 0) -> float:
"""
Return the effective timeout (seconds) with multiplier/backoff applied.
"""
return self._get_timeout_config().effective(key, override, attempt = attempt)
def _timeout_attempts(self) -> int:
cfg = self._get_timeout_config()
if not cfg.retry_enabled:
return 1
# Always perform the initial attempt plus the configured number of retries.
return 1 + cfg.retry_max_attempts
def _record_timing(
self,
*,
key:str,
description:str,
configured_timeout:float,
effective_timeout:float,
actual_duration:float,
attempt_index:int,
success:bool,
) -> None:
collector = getattr(self, "_timing_collector", None)
if collector is None:
return
operation_type = description.split("(", 1)[0] if "(" in description else description
try:
collector.record(
key = key,
operation_type = operation_type,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = actual_duration,
attempt_index = attempt_index,
success = success,
)
except Exception as exc: # noqa: BLE001
LOG.warning("Timing collector failed for key=%s operation=%s: %s", key, operation_type, exc)
async def _run_with_timeout_retries(
self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None
) -> T:
"""
Execute an async callable with retry/backoff handling for TimeoutError.
"""
attempts = self._timeout_attempts()
configured_timeout = self._timeout(key, override)
loop = asyncio.get_running_loop()
for attempt in range(attempts):
effective_timeout = self._effective_timeout(key, override, attempt = attempt)
attempt_started = loop.time()
try:
result = await operation(effective_timeout)
self._record_timing(
key = key,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = loop.time() - attempt_started,
attempt_index = attempt,
success = True,
)
return result
except TimeoutError:
self._record_timing(
key = key,
description = description,
configured_timeout = configured_timeout,
effective_timeout = effective_timeout,
actual_duration = loop.time() - attempt_started,
attempt_index = attempt,
success = False,
)
if attempt >= attempts - 1:
raise
LOG.debug("Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout)
raise TimeoutError(f"{description} failed without executing operation")
@staticmethod
def _allocate_selector_group_budgets(total_timeout:float, selector_count:int) -> list[float]:
"""Allocate a shared timeout budget across selector alternatives.
Strategy:
- Give the first selector a preferred share via `_PRIMARY_SELECTOR_BUDGET_RATIO`.
- Keep a minimum floor `_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS` per selector.
- Cap backup slices with `_BACKUP_SELECTOR_BUDGET_CAP_SECONDS`.
- Reassign final-backup surplus to the primary slot to preserve total timeout.
"""
if selector_count <= 0:
raise ValueError(_("selector_count must be > 0"))
if selector_count == 1:
return [max(total_timeout, 0.0)]
if total_timeout <= 0:
return [0.0 for _ in range(selector_count)]
# If total_timeout cannot satisfy per-slot floor, split equally to preserve total budget.
floor_total = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * selector_count
if total_timeout < floor_total:
equal_share = total_timeout / selector_count
return [equal_share for _ in range(selector_count)]
# Reserve minimum floor for backups before sizing the primary slice.
reserve_for_backups = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * (selector_count - 1)
# Primary gets preferred ratio, but never steals the reserved backup floors.
primary = min(total_timeout * _PRIMARY_SELECTOR_BUDGET_RATIO, total_timeout - reserve_for_backups)
primary = max(primary, _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS)
budgets = [primary]
remaining = total_timeout - primary
for index in range(selector_count - 1):
is_last_backup = index == selector_count - 2
if is_last_backup:
# Last backup is capped; any surplus is folded back into primary to keep sum == total_timeout.
alloc = min(remaining, _BACKUP_SELECTOR_BUDGET_CAP_SECONDS)
budgets.append(alloc)
surplus = remaining - alloc
if surplus > 0:
budgets[0] += surplus
continue
remaining_slots_after_this = selector_count - len(budgets) - 1
# Keep floor reserve for remaining backups, then clamp this slice to floor/cap bounds.
min_reserve = _BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS * remaining_slots_after_this
alloc = remaining - min_reserve
alloc = max(_BACKUP_SELECTOR_BUDGET_FLOOR_SECONDS, alloc)
alloc = min(_BACKUP_SELECTOR_BUDGET_CAP_SECONDS, alloc)
budgets.append(alloc)
remaining -= alloc
return budgets
async def web_find_first_available(
self,
selectors:Sequence[tuple[By, str]],
*,
parent:Element | None = None,
timeout:int | float | None = None,
key:str = "default",
description:str | None = None,
) -> tuple[Element, int]:
"""
Find the first matching selector from an ordered group using a shared timeout budget.
"""
if not selectors:
raise ValueError(_("selectors must contain at least one selector"))
async def attempt(effective_timeout:float) -> tuple[Element, int]:
budgets = self._allocate_selector_group_budgets(effective_timeout, len(selectors))
failures:list[str] = []
for index, ((selector_type, selector_value), candidate_timeout) in enumerate(zip(selectors, budgets, strict = True)):
try:
element = await self._web_find_once(selector_type, selector_value, candidate_timeout, parent = parent)
LOG.debug(
"Selector group matched candidate %d/%d (%s=%s) within %.2fs (group budget %.2fs)",
index + 1,
len(selectors),
selector_type.name,
selector_value,
candidate_timeout,
effective_timeout,
)
return element, index
except TimeoutError as exc:
failures.append(str(exc))
LOG.debug(
"Selector group candidate %d/%d timed out (%s=%s) after %.2fs (group budget %.2fs)",
index + 1,
len(selectors),
selector_type.name,
selector_value,
candidate_timeout,
effective_timeout,
)
failure_summary = failures[-1] if failures else _("No selector candidates executed.")
raise TimeoutError(
_(
"No HTML element found using selector group after trying %(count)d alternatives within %(timeout)s seconds."
" Last error: %(error)s"
)
% {"count": len(selectors), "timeout": effective_timeout, "error": failure_summary}
)
attempt_description = description or f"web_find_first_available({len(selectors)} selectors)"
return await self._run_with_timeout_retries(attempt, description = attempt_description, key = key, override = timeout)
async def web_text_first_available(
self,
selectors:Sequence[tuple[By, str]],
*,
parent:Element | None = None,
timeout:int | float | None = None,
key:str = "default",
description:str | None = None,
) -> tuple[str, int]:
"""
Return visible text from the first selector that resolves from a selector group.
"""
element, matched_index = await self.web_find_first_available(
selectors,
parent = parent,
timeout = timeout,
key = key,
description = description,
)
text = await self._extract_visible_text(element)
return text, matched_index
async def _extract_visible_text(self, element:Element) -> str:
"""Return visible text for a DOM element using user-selection extraction."""
return str(
await element.apply("""
function (elem) {
let sel = window.getSelection()
sel.removeAllRanges()
let range = document.createRange()
range.selectNode(elem)
sel.addRange(range)
let visibleText = sel.toString().trim()
sel.removeAllRanges()
return visibleText
}
""")
)
async def create_browser_session(self) -> None:
LOG.info("Creating Browser session...")
if self.browser_config.binary_location:
ensure(await files.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
else:
self.browser_config.binary_location = self.get_compatible_browser()
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
has_remote_debugging = any(arg.startswith("--remote-debugging-port=") for arg in self.browser_config.arguments)
is_test_environment = bool(os.environ.get("PYTEST_CURRENT_TEST"))
if (
not (self.browser_config.user_data_dir and self.browser_config.user_data_dir.strip())
and not _has_non_empty_user_data_dir_arg(self.browser_config.arguments)
and not has_remote_debugging
and not is_test_environment
):
LOG.debug("No browser user_data_dir configured. Set browser.user_data_dir or --user-data-dir for non-test runs.")
# Chrome version detection and validation
if has_remote_debugging:
try:
await self._validate_chrome_version_configuration()
except AssertionError as exc:
LOG.warning("Remote debugging detected, but browser configuration looks invalid: %s", exc)
else:
await self._validate_chrome_version_configuration()
########################################################
# check if an existing browser instance shall be used...
########################################################
remote_host = "127.0.0.1"
remote_port = 0
for arg in self.browser_config.arguments:
if arg.startswith("--remote-debugging-host="):
remote_host = arg.split("=", maxsplit = 1)[1]
if arg.startswith("--remote-debugging-port="):
remote_port = int(arg.split("=", maxsplit = 1)[1])
if remote_port > 0:
LOG.info("Using existing browser process at %s:%s", remote_host, remote_port)
# Enhanced port checking with retry logic
port_available = await self._check_port_with_retry(remote_host, remote_port)
ensure(
port_available,
f"Browser process not reachable at {remote_host}:{remote_port}. "
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. "
f"Make sure the browser is running and the port is not blocked by firewall.",
)
try:
cfg = NodriverConfig(
browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without
)
cfg.host = remote_host
cfg.port = remote_port
self.browser = await nodriver.start(cfg) # type: ignore[attr-defined]
LOG.info("New Browser session is %s", self.browser.websocket_url)
return
except Exception as e:
error_msg = str(e)
if "root" in error_msg.lower():
LOG.error("Failed to connect to browser. This error often occurs when:")
LOG.error("1. Running as root user (try running as regular user)")
LOG.error("2. Browser profile is locked or in use by another process")
LOG.error("3. Insufficient permissions to access the browser profile")
LOG.error("4. Browser is not properly started with remote debugging enabled")
LOG.error("")
LOG.error("Troubleshooting steps:")
LOG.error("1. Close all browser instances and try again")
LOG.error("2. Remove the user_data_dir configuration temporarily")
LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", self.browser_config.binary_location, remote_port)
LOG.error("4. Check if any antivirus or security software is blocking the connection")
raise
########################################################
# configure and initialize new browser instance...
########################################################
# default_browser_args: @ https://github.com/ultrafunkamsterdam/nodriver/blob/main/nodriver/core/config.py
# https://peter.sh/experiments/chromium-command-line-switches/
# https://github.com/GoogleChrome/chrome-launcher/blob/main/docs/chrome-flags-for-tools.md
browser_args = [
# "--disable-dev-shm-usage", # https://stackoverflow.com/a/50725918/5116073
"--disable-crash-reporter",
"--disable-domain-reliability",
"--disable-sync",
"--no-experiments",
"--disable-search-engine-choice-screen",
"--disable-features=MediaRouter",
"--use-mock-keychain",
"--test-type", # https://stackoverflow.com/a/36746675/5116073
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"',
]
is_edge = "edge" in self.browser_config.binary_location.lower()
if is_edge:
os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
if self.browser_config.use_private_window:
browser_args.append("-inprivate" if is_edge else "--incognito")
if self.browser_config.profile_name:
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
user_data_dir_from_args:str | None = None
for browser_arg in self.browser_config.arguments:
LOG.info(" -> Custom Browser argument: %s", browser_arg)
if browser_arg.startswith("--user-data-dir="):
raw = browser_arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
if not raw:
LOG.warning("Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir.")
continue
user_data_dir_from_args = raw
continue
browser_args.append(browser_arg)
effective_user_data_dir = user_data_dir_from_args or self.browser_config.user_data_dir
if user_data_dir_from_args and self.browser_config.user_data_dir:
arg_path, cfg_path = await asyncio.get_running_loop().run_in_executor(
None,
_resolve_user_data_dir_paths,
user_data_dir_from_args,
self.browser_config.user_data_dir,
)
if arg_path is None or cfg_path is None or arg_path != cfg_path:
LOG.warning(
"Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value.",
self.browser_config.user_data_dir,
user_data_dir_from_args,
)
if not effective_user_data_dir and not is_test_environment:
LOG.debug("No effective browser user_data_dir found. Browser will use its default profile location.")
self.browser_config.user_data_dir = effective_user_data_dir
if not loggers.is_debug(LOG):
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
if self.browser_config.user_data_dir:
LOG.info(" -> Browser user data dir: %s", self.browser_config.user_data_dir)
cfg = NodriverConfig(
headless = False,
browser_executable_path = self.browser_config.binary_location,
browser_args = browser_args,
user_data_dir = self.browser_config.user_data_dir,
)
# When --no-sandbox is in browser_args, nodriver's Config.sandbox must also be set to False.
# Otherwise nodriver re-adds --no-sandbox itself but still runs internal sandbox-related logic
# that can cause startup failures in containerized environments (Docker, LXC, etc.).
if any(arg == "--no-sandbox" for arg in browser_args):
cfg.sandbox = False
# already logged by nodriver:
# LOG.debug("-> Effective browser arguments: \n\t\t%s", "\n\t\t".join(cfg.browser_args))
# Enhanced profile directory handling
if cfg.user_data_dir:
xdg_paths.ensure_directory(Path(cfg.user_data_dir), "browser profile directory")
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
os.makedirs(profile_dir, exist_ok = True)
prefs_file = os.path.join(profile_dir, "Preferences")
if not await files.exists(prefs_file):
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
await asyncio.get_running_loop().run_in_executor(None, _write_initial_prefs, prefs_file)
# load extensions
for crx_extension in self.browser_config.extensions:
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
ensure(await files.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
cfg.add_extension(crx_extension)
try:
self.browser = await nodriver.start(cfg) # type: ignore[attr-defined]
LOG.info("New Browser session is %s", self.browser.websocket_url)
except Exception as e:
# Clean up any resources that were created during setup
self._cleanup_session_resources()
error_msg = str(e)
if "root" in error_msg.lower():
LOG.error("Failed to start browser. This error often occurs when:")
LOG.error("1. Running as root user (try running as regular user)")
LOG.error("2. Browser profile is locked or in use by another process")
LOG.error("3. Insufficient permissions to access the browser profile")
LOG.error("4. Browser binary is not executable or missing")
LOG.error("")
LOG.error("Troubleshooting steps:")
LOG.error("1. Close all browser instances and try again")
LOG.error("2. Remove the user_data_dir configuration temporarily")
LOG.error("3. Try running without profile configuration")
LOG.error("4. Check browser binary permissions: %s", self.browser_config.binary_location)
LOG.error("5. Check if any antivirus or security software is blocking the browser")
raise
async def _check_port_with_retry(self, host:str, port:int, max_retries:int = 3, retry_delay:float = 1.0) -> bool:
"""
Check if a port is open with retry logic.
Args:
host: Host to check
port: Port to check
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
Returns:
True if port is open, False otherwise
"""
for attempt in range(max_retries):
if net.is_port_open(host, port):
return True
if attempt < max_retries - 1:
LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", host, port, retry_delay, attempt + 1, max_retries)
await asyncio.sleep(retry_delay)
return False
def diagnose_browser_issues(self) -> None:
"""
Diagnose common browser connection issues and provide troubleshooting information.
"""
LOG.info("=== Browser Connection Diagnostics ===")
# Check browser binary
if self.browser_config.binary_location:
if os.path.exists(self.browser_config.binary_location):
LOG.info("(ok) Browser binary exists: %s", self.browser_config.binary_location)
if os.access(self.browser_config.binary_location, os.X_OK):
LOG.info("(ok) Browser binary is executable")
else:
LOG.error("(fail) Browser binary is not executable")
else:
LOG.error("(fail) Browser binary not found: %s", self.browser_config.binary_location)
else:
try:
browser_path = self.get_compatible_browser()
except AssertionError as exc:
LOG.debug("Browser auto-detection failed: %s", exc)
browser_path = None
if browser_path:
LOG.info("(ok) Auto-detected browser: %s", browser_path)
# Set the binary location for Chrome version detection
self.browser_config.binary_location = browser_path
else:
LOG.error("(fail) No compatible browser found")
# Check user data directory
if self.browser_config.user_data_dir:
if os.path.exists(self.browser_config.user_data_dir):
LOG.info("(ok) User data directory exists: %s", self.browser_config.user_data_dir)
if os.access(self.browser_config.user_data_dir, os.R_OK | os.W_OK):
LOG.info("(ok) User data directory is readable and writable")
else:
LOG.error("(fail) User data directory permissions issue")
else:
LOG.info("(info) User data directory does not exist (will be created): %s", self.browser_config.user_data_dir)
# Check for remote debugging port
remote_port = 0
for arg in self.browser_config.arguments:
if arg.startswith("--remote-debugging-port="):
remote_port = int(arg.split("=", maxsplit = 1)[1])
break
if remote_port > 0:
LOG.info("(info) Remote debugging port configured: %d", remote_port)
if net.is_port_open("127.0.0.1", remote_port):
LOG.info("(ok) Remote debugging port is open")
# Try to get more information about the debugging endpoint
try:
probe_timeout = self._effective_timeout("chrome_remote_probe")
response = urllib.request.urlopen(f"http://127.0.0.1:{remote_port}/json/version", timeout = probe_timeout)
version_info = json.loads(response.read().decode())
LOG.info("(ok) Remote debugging API accessible - Browser: %s", version_info.get("Browser", "Unknown"))
except Exception as e:
LOG.warning("(fail) Remote debugging port is open but API not accessible: %s", str(e))
LOG.info(" This might indicate a browser update issue or configuration problem")
else:
LOG.info("(info) Remote debugging port is not open")
# Check for running browser processes
browser_processes = []
target_browser_name = ""
# Get the target browser name for comparison
if self.browser_config.binary_location:
target_browser_name = os.path.basename(self.browser_config.binary_location).lower()
else:
try:
target_browser_path = self.get_compatible_browser()
target_browser_name = os.path.basename(target_browser_path).lower()
except (AssertionError, TypeError):
target_browser_name = ""
try:
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
proc_name = proc.info["name"] or ""
cmdline = proc.info["cmdline"] or []
# Check if this is a browser process relevant to our diagnostics
is_relevant_browser = False
# Is this the target browser?
is_target_browser = target_browser_name and target_browser_name in proc_name.lower()
# Does it have remote debugging?
has_remote_debugging = cmdline and any(arg.startswith("--remote-debugging-port=") for arg in cmdline)
# Detect target browser processes for diagnostics
if is_target_browser:
is_relevant_browser = True
# Add debugging status to the process info for better diagnostics
proc.info["has_remote_debugging"] = has_remote_debugging
if is_relevant_browser:
browser_processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process ended or is not accessible; skip it.
pass
except (psutil.Error, PermissionError) as exc:
LOG.warning("(warn) Unable to inspect browser processes: %s", exc)
browser_processes = []
if browser_processes:
LOG.info("(info) Found %d browser processes running", len(browser_processes))
for proc in browser_processes[:3]: # Show first 3
has_debugging = proc.get("has_remote_debugging", False)
if has_debugging:
LOG.info(" - PID %d: %s (remote debugging enabled)", proc["pid"], proc["name"])
else:
LOG.warning(" - PID %d: %s (remote debugging NOT enabled)", proc["pid"], proc["name"])
else:
LOG.info("(info) No browser processes currently running")
if platform.system() == "Linux":
if _is_admin():
LOG.error("(fail) Running as root - this can cause browser issues")
# Chrome version detection and validation
self._diagnose_chrome_version_issues(remote_port)
LOG.info("=== End Diagnostics ===")
def close_browser_session(self) -> None:
if self.browser:
LOG.debug("Closing Browser session...")
self.page = None # pyright: ignore[reportAttributeAccessIssue]
browser_process = psutil.Process(self.browser._process_pid) # noqa: SLF001 Private member accessed
browser_children:list[psutil.Process] = browser_process.children()
self.browser.stop()
for p in browser_children:
if p.is_running():
p.kill() # terminate orphaned browser processes
self.browser = None # pyright: ignore[reportAttributeAccessIssue]
def _cleanup_session_resources(self) -> None:
"""Clean up any resources that were created during session setup."""
# Reset browser and page references
self.browser = None # pyright: ignore[reportAttributeAccessIssue]
self.page = None # pyright: ignore[reportAttributeAccessIssue]
def get_compatible_browser(self) -> str:
browser_paths:list[str | None] = []
match platform.system():
case "Linux":
browser_paths = [shutil.which("chromium"), shutil.which("chromium-browser"), shutil.which("google-chrome"), shutil.which("microsoft-edge")]
case "Darwin":
browser_paths = [
"/Applications/Chromium.app/Contents/MacOS/Chromium",
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
]
case "Windows":
def win_path(*parts:str) -> str:
return str(PureWindowsPath(*parts))
program_files = os.environ.get("PROGRAMFILES", "C:\\Program Files")
program_files_x86 = os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)")
local_app_data = os.environ.get("LOCALAPPDATA")
if not local_app_data:
user_profile = os.environ.get("USERPROFILE")
if user_profile:
local_app_data = win_path(user_profile, "AppData", "Local")
browser_paths = [
win_path(local_app_data, "Google", "Chrome", "Application", "chrome.exe") if local_app_data else None,
win_path(program_files, "Google", "Chrome", "Application", "chrome.exe"),
win_path(program_files_x86, "Google", "Chrome", "Application", "chrome.exe"),
win_path(local_app_data, "Microsoft", "Edge", "Application", "msedge.exe") if local_app_data else None,
win_path(program_files, "Microsoft", "Edge", "Application", "msedge.exe"),
win_path(program_files_x86, "Microsoft", "Edge", "Application", "msedge.exe"),
win_path(local_app_data, "Chromium", "Application", "chrome.exe") if local_app_data else None,
win_path(program_files, "Chromium", "Application", "chrome.exe"),
win_path(program_files_x86, "Chromium", "Application", "chrome.exe"),
# Intentional fallback for portable/custom distributions installed under a bare "Chrome" directory.
win_path(program_files, "Chrome", "Application", "chrome.exe"),
win_path(program_files_x86, "Chrome", "Application", "chrome.exe"),
win_path(local_app_data, "Chrome", "Application", "chrome.exe") if local_app_data else None,
shutil.which("msedge.exe"),
shutil.which("chromium.exe"),
shutil.which("chrome.exe"),
]
case _ as os_name:
raise AssertionError(_("Installed browser for OS %s could not be detected") % os_name)
for browser_path in browser_paths:
if browser_path and os.path.isfile(browser_path):
return browser_path
raise AssertionError(_("Installed browser could not be detected"))
async def web_await(
self,
condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]],
*,
timeout:int | float | None = None,
timeout_error_message:str = "",
apply_multiplier:bool = True,
) -> T:
"""
Blocks/waits until the given condition is met.
:param timeout: timeout in seconds (base value, multiplier applied unless disabled)
:raises TimeoutError: if element could not be found within time
"""
loop = asyncio.get_running_loop()
start_at = loop.time()
base_timeout = timeout if timeout is not None else self._timeout()
effective_timeout = self._effective_timeout(override = base_timeout) if apply_multiplier else base_timeout
while True:
await self.page
ex:Exception | None = None
try:
result_raw = condition()
result:T = cast(T, await result_raw if inspect.isawaitable(result_raw) else result_raw)
if result:
return result
except Exception as ex1:
ex = ex1
elapsed = loop.time() - start_at
if elapsed >= effective_timeout:
if ex:
raise ex
raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds")
remaining_timeout = max(effective_timeout - elapsed, 0.0)
await self.page.sleep(min(0.5, remaining_timeout))
async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool:
"""
Locates an HTML element and returns a state.
:param timeout: timeout in seconds
:raises TimeoutError: if element could not be found within time
"""
def is_disabled(elem:Element) -> bool:
return elem.attrs.get("disabled") is not None
async def is_displayed(elem:Element) -> bool:
return cast(
bool,
await elem.apply("""
function (element) {
var style = window.getComputedStyle(element);
return style.display !== 'none'
&& style.visibility !== 'hidden'
&& style.opacity !== '0'
&& element.offsetWidth > 0
&& element.offsetHeight > 0
}
"""),
)
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
match attr:
case Is.CLICKABLE:
return not is_disabled(elem) or await is_displayed(elem)
case Is.DISPLAYED:
return await is_displayed(elem)
case Is.DISABLED:
return is_disabled(elem)
case Is.READONLY:
return elem.attrs.get("readonly") is not None
case Is.SELECTED:
return cast(
bool,
await elem.apply("""
function (element) {
if (element.tagName.toLowerCase() === 'input') {
if (element.type === 'checkbox' || element.type === 'radio') {
return element.checked
}
}
return false
}
"""),
)
raise AssertionError(_("Unsupported attribute: %s") % attr)
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element:
"""
Locates an HTML element by ID.
:param timeout: timeout in seconds
:raises TimeoutError: if element could not be found within time
"""
elem = await self.web_find(selector_type, selector_value, timeout = timeout)
await elem.click()
await self.web_sleep()
return elem
async def web_execute(self, jscode:str) -> Any:
"""
Executes the given JavaScript code in the context of the current page.
Handles nodriver 0.47+ RemoteObject results by converting them to regular Python objects.
Uses the RemoteObject API (value, deep_serialized_value) for proper conversion.
:param jscode: JavaScript code to execute
:return: The javascript's return value as a regular Python object
"""
# Try to get the result with return_by_value=True first
result = await self.page.evaluate(jscode, await_promise = True, return_by_value = True)
# If we got a RemoteObject, use the proper API to get properties
if _is_remote_object(result):
try:
# Type cast to RemoteObject for type checker
remote_obj:"RemoteObject" = result
# Use the proper RemoteObject API - try to get the value directly first
if hasattr(remote_obj, "value") and remote_obj.value is not None:
return remote_obj.value
# For complex objects, use deep_serialized_value which contains the actual data
if hasattr(remote_obj, "deep_serialized_value") and remote_obj.deep_serialized_value:
value = remote_obj.deep_serialized_value.value
# Convert the complex nested structure to a proper dictionary
return self._convert_remote_object_value(value)
# Fallback to the original result
return remote_obj
except Exception as e:
LOG.debug("Failed to extract value from RemoteObject: %s", e)
return result
# debug log the jscode but avoid excessive debug logging of window.scrollTo calls
_prev_jscode:str = getattr(self.__class__.web_execute, "_prev_jscode", "")
if not (jscode == _prev_jscode or (jscode.startswith("window.scrollTo") and _prev_jscode.startswith("window.scrollTo"))):
LOG.debug("web_execute(`%s`) = `%s`", jscode, result)
self.__class__.web_execute._prev_jscode = jscode # type: ignore[attr-defined] # noqa: SLF001 Private member accessed
return result
def _convert_remote_object_value(self, data:Any) -> Any:
"""
Recursively converts RemoteObject values to regular Python objects.
Handles the complex nested structure from deep_serialized_value.
Converts key/value lists to dictionaries and processes type/value structures.
:param data: The data to convert (list, dict, or primitive)
:return: Converted Python object
"""
if isinstance(data, list):
# Check if this is a key/value list format: [["key", "value"], ...]
if data and isinstance(data[0], list) and len(data[0]) == _KEY_VALUE_PAIR_SIZE:
# Convert list of [key, value] pairs to dict
converted_dict = {}
for item in data:
if len(item) == _KEY_VALUE_PAIR_SIZE:
key, value = item
# Handle nested structures in values
if isinstance(value, dict) and "type" in value and "value" in value:
# Extract the actual value from the type/value structure
converted_dict[key] = self._convert_remote_object_value(value["value"])
else:
converted_dict[key] = self._convert_remote_object_value(value)
return converted_dict
# Regular list - convert each item
return [self._convert_remote_object_value(item) for item in data]
if isinstance(data, dict):
# Handle type/value structures: {'type': 'string', 'value': 'actual_value'}
if "type" in data and "value" in data:
return self._convert_remote_object_value(data["value"])
# Regular dict - convert each value
return {key: self._convert_remote_object_value(value) for key, value in data.items()}
# Return primitive values as-is
return data
async def _xpath_first(self, selector_value:str) -> Element | None:
matches = await self.page.xpath(selector_value, timeout = 0)
for match in matches:
if match is not None:
return cast(Element, match)
return None
async def _xpath_all(self, selector_value:str) -> list[Element]:
matches = await self.page.xpath(selector_value, timeout = 0)
return [cast(Element, match) for match in matches if match is not None]
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> Element:
"""
Locates an HTML element by the given selector type and value.
:param timeout: timeout in seconds (base value before multiplier/backoff)
:raises TimeoutError: if element could not be found within time
"""
async def attempt(effective_timeout:float) -> Element:
return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent)
return await self._run_with_timeout_retries( # noqa: E501
attempt, description = f"web_find({selector_type.name}, {selector_value})", key = "default", override = timeout
)
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]:
"""
Locates multiple HTML elements by the given selector type and value.
:param timeout: timeout in seconds (base value before multiplier/backoff)
:raises TimeoutError: if element could not be found within time
"""
async def attempt(effective_timeout:float) -> list[Element]:
return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent)
return await self._run_with_timeout_retries(
attempt, description = f"web_find_all({selector_type.name}, {selector_value})", key = "default", override = timeout
)
async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element:
timeout_suffix = f" within {timeout} seconds."
match selector_type:
case By.ID:
escaped_id = selector_value.translate(METACHAR_ESCAPER)
return await self.web_await(
lambda: self.page.query_selector(f"#{escaped_id}", parent),
timeout = timeout,
timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.CLASS_NAME:
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
return await self.web_await(
lambda: self.page.query_selector(f".{escaped_classname}", parent),
timeout = timeout,
timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.TAG_NAME:
return await self.web_await(
lambda: self.page.query_selector(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}",
apply_multiplier = False,
)
case By.CSS_SELECTOR:
return await self.web_await(
lambda: self.page.query_selector(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.TEXT:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_element_by_text(selector_value, best_match = True),
timeout = timeout,
timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.XPATH:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self._xpath_first(selector_value),
timeout = timeout,
timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
async def _web_find_all_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> list[Element]:
timeout_suffix = f" within {timeout} seconds."
match selector_type:
case By.CLASS_NAME:
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
return await self.web_await(
lambda: self.page.query_selector_all(f".{escaped_classname}", parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.CSS_SELECTOR:
return await self.web_await(
lambda: self.page.query_selector_all(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.TAG_NAME:
return await self.web_await(
lambda: self.page.query_selector_all(selector_value, parent),
timeout = timeout,
timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}",
apply_multiplier = False,
)
case By.TEXT:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self.page.find_elements_by_text(selector_value),
timeout = timeout,
timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
case By.XPATH:
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
return await self.web_await(
lambda: self._xpath_all(selector_value),
timeout = timeout,
timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}",
apply_multiplier = False,
)
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float | None = None) -> Element:
"""
Enters text into an HTML input field.
:param timeout: timeout in seconds
:raises TimeoutError: if element could not be found within time
"""
input_field = await self.web_find(selector_type, selector_value, timeout = timeout)
await input_field.clear_input()
await input_field.send_keys(str(text))
await self.web_sleep()
return input_field
async def web_open(self, url:str, *, timeout:int | float | None = None, reload_if_already_open:bool = False) -> None:
"""
:param url: url to open in browser
:param timeout: timespan in seconds within the page needs to be loaded (base value)
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
:raises TimeoutException: if page did not open within given timespan
"""
LOG.debug(" -> Opening [%s]...", url)
if not reload_if_already_open and self.page and url == self.page.url:
LOG.debug(" => skipping, [%s] is already open", url)
return
self.page = await self.browser.get(url = url, new_tab = False, new_window = False)
page_timeout = self._effective_timeout("page_load", timeout)
await self.web_await(
lambda: self.web_execute("document.readyState == 'complete'"),
timeout = page_timeout,
timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.",
apply_multiplier = False,
)
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str:
element = await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)
return await self._extract_visible_text(element)
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
LOG.log(
loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
" ... pausing for %d ms ...",
duration,
)
await self.page.sleep(duration / 1_000)
async def _navigate_paginated_ad_overview(
self,
page_action:Callable[[int], Awaitable[bool]],
page_url:str = "https://www.kleinanzeigen.de/m-meine-anzeigen.html",
*,
max_pages:int = 10,
) -> bool:
"""
Navigate through paginated ad overview page, calling page_action on each page.
This helper guarantees to return a boolean result and never propagates TimeoutError.
All timeout conditions are handled internally and logged appropriately.
Args:
page_action: Async callable that receives current_page number and returns True if action succeeded/should stop
page_url: URL of the paginated overview page (default: kleinanzeigen ad management page)
max_pages: Maximum number of pages to navigate (safety limit)
Returns:
True if page_action returned True on any page, False otherwise
Example:
async def find_ad_callback(page_num: int) -> bool:
element = await self.web_find(By.XPATH, "//div[@id='my-ad']")
if element:
await element.click()
return True
return False
success = await self._navigate_paginated_ad_overview(find_ad_callback)
"""
try:
await self.web_open(page_url)
except TimeoutError:
LOG.warning("Failed to open ad overview page at %s: timeout", page_url)
return False
await self.web_sleep(2000, 3000)
# Check if ad list container exists
try:
_ = await self.web_find(By.ID, "my-manageitems-adlist")
except TimeoutError:
LOG.warning("Ad list container not found. Maybe no ads present?")
return False
# Check for pagination controls
multi_page = False
pagination_timeout = self._timeout("pagination_initial")
try:
pagination_section = await self.web_find(By.CSS_SELECTOR, ".Pagination", timeout = pagination_timeout)
next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent = pagination_section)
if next_buttons:
enabled_next_buttons = [btn for btn in next_buttons if not btn.attrs.get("disabled")]
if enabled_next_buttons:
multi_page = True
LOG.info("Multiple ad pages detected.")
except TimeoutError:
LOG.info("No pagination controls found. Assuming single page.")
current_page = 1
while current_page <= max_pages:
LOG.info("Processing page %s...", current_page)
try:
await self.web_scroll_page_down()
except TimeoutError:
LOG.debug("Scroll timeout on page %s (non-critical, continuing)", current_page)
await self.web_sleep(2000, 3000)
try:
if await page_action(current_page):
return True
except TimeoutError:
LOG.warning("Page action timed out on page %s", current_page)
return False
if not multi_page:
break
follow_up_timeout = self._timeout("pagination_follow_up")
try:
pagination_section = await self.web_find(By.CSS_SELECTOR, ".Pagination", timeout = follow_up_timeout)
next_button_element = None
possible_next_buttons = await self.web_find_all(By.CSS_SELECTOR, 'button[aria-label="Nächste"]', parent = pagination_section)
for btn in possible_next_buttons:
if not btn.attrs.get("disabled"):
next_button_element = btn
break
if next_button_element:
LOG.info("Navigating to page %s...", current_page + 1)
await next_button_element.click()
await self.web_sleep(3000, 4000)
current_page += 1
else:
LOG.info("Last page reached (no enabled 'Naechste' button found).")
break
except TimeoutError:
LOG.info("No pagination controls found. Assuming last page.")
break
return False
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, headers:dict[str, str] | None = None) -> Any:
method = method.upper()
LOG.debug(" -> HTTP %s [%s]...", method, url)
response = await self.web_execute(f"""
fetch("{url}", {{
method: "{method}",
redirect: "follow",
headers: {headers or {}}
}})
.then(response => response.text().then(responseText => {{
headers = {{}};
response.headers.forEach((v, k) => headers[k] = v);
return {{
statusCode: response.status,
statusMessage: response.statusText,
headers: headers,
content: responseText
}}
}}))
""")
if isinstance(valid_response_codes, int):
valid_response_codes = [valid_response_codes]
ensure(
response["statusCode"] in valid_response_codes,
f'Invalid response "{response["statusCode"]} {response["statusMessage"]}" received for HTTP {method} to {url}',
)
return response
# pylint: enable=dangerous-default-value
async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None:
"""
Smoothly scrolls the current web page down.
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
:param scroll_speed: the speed of scrolling, higher is faster
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
"""
current_y_pos = 0
bottom_y_pos:int = await self.web_execute("document.body.scrollHeight") # get bottom position
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
current_y_pos += scroll_length
await self.web_execute(f"window.scrollTo(0, {current_y_pos})") # scroll one step
await asyncio.sleep(scroll_length / scroll_speed)
if scroll_back_top: # scroll back to top in same style
while current_y_pos > 0:
current_y_pos -= scroll_length
await self.web_execute(f"window.scrollTo(0, {current_y_pos})")
await asyncio.sleep(scroll_length / scroll_speed / 2) # double speed
async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float | None = None) -> Element:
"""
Selects an of a HTML element.
:param timeout: timeout in seconds
:raises TimeoutError: if element could not be found within time
:raises UnexpectedTagNameException: if element is not a