mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 10:31:50 +01:00
## ℹ️ Description - Related issues: #671, #658 - Introduces configurable timeout controls plus retry/backoff handling for flaky DOM operations. We often see timeouts which are note reproducible in certain configurations. I suspect timeout issues based on a combination of internet speed, browser, os, age of the computer and the weather. This PR introduces a comprehensive config model to tweak timeouts. ## 📋 Changes Summary - add TimeoutConfig to the main config/schema and expose timeouts in README/docs - wire WebScrapingMixin, extractor, update checker, and browser diagnostics to honor the configurable timeouts and retries - update translations/tests to cover the new behaviour and ensure lint/mypy/pyright pipelines remain green ### ⚙️ Type of Change - [ ] 🐞 Bug fix (non-breaking change which fixes an issue) - [x] ✨ New feature (adds new functionality without breaking existing usage) - [ ] 💥 Breaking change (changes that might break existing user setups, scripts, or configurations) ## ✅ Checklist - [x] I have reviewed my changes to ensure they meet the project's standards. - [x] I have tested my changes and ensured that all tests pass (`pdm run test`). - [x] I have formatted the code (`pdm run format`). - [x] I have verified that linting passes (`pdm run lint`). - [x] I have updated documentation where necessary. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Centralized, configurable timeout system for web interactions, detection flows, publishing, and pagination. * Optional retry with exponential backoff for operations that time out. * **Improvements** * Replaced fixed wait times with dynamic timeouts throughout workflows. * More informative timeout-related messages and diagnostics. * **Tests** * New and expanded test coverage for timeout behavior, pagination, diagnostics, and retry logic. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1154 lines
54 KiB
Python
1154 lines
54 KiB
Python
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
|
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
|
import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404
|
|
from collections.abc import Awaitable, Callable, Coroutine, Iterable
|
|
from gettext import gettext as _
|
|
from typing import Any, Final, Optional, cast
|
|
|
|
try:
|
|
from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy
|
|
except ImportError:
|
|
from typing import NoReturn as Never # Python <3.11
|
|
|
|
import nodriver, psutil # isort: skip
|
|
from typing import TYPE_CHECKING, TypeGuard
|
|
|
|
from nodriver.core.browser import Browser
|
|
from nodriver.core.config import Config as NodriverConfig
|
|
from nodriver.core.element import Element
|
|
from nodriver.core.tab import Tab as Page
|
|
|
|
from kleinanzeigen_bot.model.config_model import Config as BotConfig
|
|
from kleinanzeigen_bot.model.config_model import TimeoutConfig
|
|
|
|
from . import loggers, net
|
|
from .chrome_version_detector import (
|
|
ChromeVersionInfo,
|
|
detect_chrome_version_from_binary,
|
|
detect_chrome_version_from_remote_debugging,
|
|
get_chrome_version_diagnostic_info,
|
|
validate_chrome_136_configuration,
|
|
)
|
|
from .misc import T, ensure
|
|
|
|
if TYPE_CHECKING:
|
|
from nodriver.cdp.runtime import RemoteObject
|
|
|
|
|
|
# Constants for RemoteObject conversion
|
|
_KEY_VALUE_PAIR_SIZE = 2
|
|
|
|
|
|
def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]:
|
|
"""Type guard to check if an object is a RemoteObject."""
|
|
return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj))
|
|
|
|
|
|
__all__ = [
|
|
"Browser",
|
|
"BrowserConfig",
|
|
"By",
|
|
"Element",
|
|
"Page",
|
|
"Is",
|
|
"WebScrapingMixin",
|
|
]
|
|
|
|
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
|
|
|
# see https://api.jquery.com/category/selectors/
|
|
METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'})
|
|
|
|
|
|
def _is_admin() -> bool:
|
|
"""Check if the current process is running with admin/root privileges."""
|
|
try:
|
|
if hasattr(os, "geteuid"):
|
|
result = os.geteuid() == 0
|
|
return bool(result)
|
|
return False
|
|
except AttributeError:
|
|
return False
|
|
|
|
|
|
class By(enum.Enum):
|
|
ID = enum.auto()
|
|
CLASS_NAME = enum.auto()
|
|
CSS_SELECTOR = enum.auto()
|
|
TAG_NAME = enum.auto()
|
|
TEXT = enum.auto()
|
|
XPATH = enum.auto()
|
|
|
|
|
|
class Is(enum.Enum):
|
|
CLICKABLE = enum.auto()
|
|
DISPLAYED = enum.auto()
|
|
DISABLED = enum.auto()
|
|
READONLY = enum.auto()
|
|
SELECTED = enum.auto()
|
|
|
|
|
|
class BrowserConfig:
|
|
|
|
def __init__(self) -> None:
|
|
self.arguments:Iterable[str] = []
|
|
self.binary_location:str | None = None
|
|
self.extensions:Iterable[str] = []
|
|
self.use_private_window:bool = True
|
|
self.user_data_dir:str | None = None
|
|
self.profile_name:str | None = None
|
|
|
|
|
|
class WebScrapingMixin:
|
|
|
|
def __init__(self) -> None:
|
|
self.browser_config:Final[BrowserConfig] = BrowserConfig()
|
|
self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue]
|
|
self.page:Page = None # pyright: ignore[reportAttributeAccessIssue]
|
|
self._default_timeout_config:TimeoutConfig | None = None
|
|
self.config:BotConfig = cast(BotConfig, None)
|
|
|
|
def _get_timeout_config(self) -> TimeoutConfig:
|
|
config = getattr(self, "config", None)
|
|
timeouts:TimeoutConfig | None = None
|
|
if config is not None:
|
|
timeouts = cast(Optional[TimeoutConfig], getattr(config, "timeouts", None))
|
|
if timeouts is not None:
|
|
return timeouts
|
|
|
|
if self._default_timeout_config is None:
|
|
self._default_timeout_config = TimeoutConfig()
|
|
return self._default_timeout_config
|
|
|
|
def _timeout(self, key:str = "default", override:float | None = None) -> float:
|
|
"""
|
|
Return the base timeout (seconds) for a given key without applying multipliers.
|
|
"""
|
|
return self._get_timeout_config().resolve(key, override)
|
|
|
|
def _effective_timeout(self, key:str = "default", override:float | None = None, *, attempt:int = 0) -> float:
|
|
"""
|
|
Return the effective timeout (seconds) with multiplier/backoff applied.
|
|
"""
|
|
return self._get_timeout_config().effective(key, override, attempt = attempt)
|
|
|
|
def _timeout_attempts(self) -> int:
|
|
cfg = self._get_timeout_config()
|
|
if not cfg.retry_enabled:
|
|
return 1
|
|
# Always perform the initial attempt plus the configured number of retries.
|
|
return 1 + cfg.retry_max_attempts
|
|
|
|
async def _run_with_timeout_retries(
|
|
self,
|
|
operation:Callable[[float], Awaitable[T]],
|
|
*,
|
|
description:str,
|
|
key:str = "default",
|
|
override:float | None = None
|
|
) -> T:
|
|
"""
|
|
Execute an async callable with retry/backoff handling for TimeoutError.
|
|
"""
|
|
attempts = self._timeout_attempts()
|
|
|
|
for attempt in range(attempts):
|
|
effective_timeout = self._effective_timeout(key, override, attempt = attempt)
|
|
try:
|
|
return await operation(effective_timeout)
|
|
except TimeoutError:
|
|
if attempt >= attempts - 1:
|
|
raise
|
|
LOG.debug(
|
|
"Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)",
|
|
description,
|
|
attempt + 1,
|
|
attempts,
|
|
effective_timeout
|
|
)
|
|
|
|
raise TimeoutError(f"{description} failed without executing operation")
|
|
|
|
async def create_browser_session(self) -> None:
|
|
LOG.info("Creating Browser session...")
|
|
|
|
if self.browser_config.binary_location:
|
|
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
|
|
else:
|
|
self.browser_config.binary_location = self.get_compatible_browser()
|
|
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
|
|
|
|
# Chrome version detection and validation
|
|
await self._validate_chrome_version_configuration()
|
|
|
|
########################################################
|
|
# check if an existing browser instance shall be used...
|
|
########################################################
|
|
remote_host = "127.0.0.1"
|
|
remote_port = 0
|
|
for arg in self.browser_config.arguments:
|
|
if arg.startswith("--remote-debugging-host="):
|
|
remote_host = arg.split("=", maxsplit = 1)[1]
|
|
if arg.startswith("--remote-debugging-port="):
|
|
remote_port = int(arg.split("=", maxsplit = 1)[1])
|
|
|
|
if remote_port > 0:
|
|
LOG.info("Using existing browser process at %s:%s", remote_host, remote_port)
|
|
|
|
# Enhanced port checking with retry logic
|
|
port_available = await self._check_port_with_retry(remote_host, remote_port)
|
|
ensure(port_available,
|
|
f"Browser process not reachable at {remote_host}:{remote_port}. "
|
|
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. "
|
|
f"Make sure the browser is running and the port is not blocked by firewall.")
|
|
|
|
try:
|
|
cfg = NodriverConfig(
|
|
browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without
|
|
)
|
|
cfg.host = remote_host
|
|
cfg.port = remote_port
|
|
self.browser = await nodriver.start(cfg) # type: ignore[attr-defined]
|
|
LOG.info("New Browser session is %s", self.browser.websocket_url)
|
|
return
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
if "root" in error_msg.lower():
|
|
LOG.error("Failed to connect to browser. This error often occurs when:")
|
|
LOG.error("1. Running as root user (try running as regular user)")
|
|
LOG.error("2. Browser profile is locked or in use by another process")
|
|
LOG.error("3. Insufficient permissions to access the browser profile")
|
|
LOG.error("4. Browser is not properly started with remote debugging enabled")
|
|
LOG.error("")
|
|
LOG.error("Troubleshooting steps:")
|
|
LOG.error("1. Close all browser instances and try again")
|
|
LOG.error("2. Remove the user_data_dir configuration temporarily")
|
|
LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d",
|
|
self.browser_config.binary_location, remote_port)
|
|
LOG.error("4. Check if any antivirus or security software is blocking the connection")
|
|
raise
|
|
|
|
########################################################
|
|
# configure and initialize new browser instance...
|
|
########################################################
|
|
|
|
# default_browser_args: @ https://github.com/ultrafunkamsterdam/nodriver/blob/main/nodriver/core/config.py
|
|
# https://peter.sh/experiments/chromium-command-line-switches/
|
|
# https://github.com/GoogleChrome/chrome-launcher/blob/main/docs/chrome-flags-for-tools.md
|
|
browser_args = [
|
|
# "--disable-dev-shm-usage", # https://stackoverflow.com/a/50725918/5116073
|
|
"--disable-crash-reporter",
|
|
"--disable-domain-reliability",
|
|
"--disable-sync",
|
|
"--no-experiments",
|
|
"--disable-search-engine-choice-screen",
|
|
|
|
"--disable-features=MediaRouter",
|
|
"--use-mock-keychain",
|
|
|
|
"--test-type", # https://stackoverflow.com/a/36746675/5116073
|
|
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
|
|
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"'
|
|
]
|
|
|
|
is_edge = "edge" in self.browser_config.binary_location.lower()
|
|
|
|
if is_edge:
|
|
os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
|
|
|
|
if self.browser_config.use_private_window:
|
|
browser_args.append("-inprivate" if is_edge else "--incognito")
|
|
|
|
if self.browser_config.profile_name:
|
|
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
|
|
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
|
|
|
|
for browser_arg in self.browser_config.arguments:
|
|
LOG.info(" -> Custom Browser argument: %s", browser_arg)
|
|
browser_args.append(browser_arg)
|
|
|
|
if not loggers.is_debug(LOG):
|
|
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
|
|
|
|
if self.browser_config.user_data_dir:
|
|
LOG.info(" -> Browser user data dir: %s", self.browser_config.user_data_dir)
|
|
|
|
cfg = NodriverConfig(
|
|
headless = False,
|
|
browser_executable_path = self.browser_config.binary_location,
|
|
browser_args = browser_args,
|
|
user_data_dir = self.browser_config.user_data_dir
|
|
)
|
|
|
|
# already logged by nodriver:
|
|
# LOG.debug("-> Effective browser arguments: \n\t\t%s", "\n\t\t".join(cfg.browser_args))
|
|
|
|
# Enhanced profile directory handling
|
|
if cfg.user_data_dir:
|
|
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
|
|
os.makedirs(profile_dir, exist_ok = True)
|
|
prefs_file = os.path.join(profile_dir, "Preferences")
|
|
if not os.path.exists(prefs_file):
|
|
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
|
|
with open(prefs_file, "w", encoding = "UTF-8") as fd:
|
|
json.dump({
|
|
"credentials_enable_service": False,
|
|
"enable_do_not_track": True,
|
|
"google": {
|
|
"services": {
|
|
"consented_to_sync": False
|
|
}
|
|
},
|
|
"profile": {
|
|
"default_content_setting_values": {
|
|
"popups": 0,
|
|
"notifications": 2 # 1 = allow, 2 = block browser notifications
|
|
},
|
|
"password_manager_enabled": False
|
|
},
|
|
"signin": {
|
|
"allowed": False
|
|
},
|
|
"translate_site_blacklist": [
|
|
"www.kleinanzeigen.de"
|
|
],
|
|
"devtools": {
|
|
"preferences": {
|
|
"currentDockState": '"bottom"'
|
|
}
|
|
}
|
|
}, fd)
|
|
|
|
# load extensions
|
|
for crx_extension in self.browser_config.extensions:
|
|
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
|
|
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
|
|
cfg.add_extension(crx_extension)
|
|
|
|
try:
|
|
self.browser = await nodriver.start(cfg) # type: ignore[attr-defined]
|
|
LOG.info("New Browser session is %s", self.browser.websocket_url)
|
|
except Exception as e:
|
|
# Clean up any resources that were created during setup
|
|
self._cleanup_session_resources()
|
|
|
|
error_msg = str(e)
|
|
if "root" in error_msg.lower():
|
|
LOG.error("Failed to start browser. This error often occurs when:")
|
|
LOG.error("1. Running as root user (try running as regular user)")
|
|
LOG.error("2. Browser profile is locked or in use by another process")
|
|
LOG.error("3. Insufficient permissions to access the browser profile")
|
|
LOG.error("4. Browser binary is not executable or missing")
|
|
LOG.error("")
|
|
LOG.error("Troubleshooting steps:")
|
|
LOG.error("1. Close all browser instances and try again")
|
|
LOG.error("2. Remove the user_data_dir configuration temporarily")
|
|
LOG.error("3. Try running without profile configuration")
|
|
LOG.error("4. Check browser binary permissions: %s", self.browser_config.binary_location)
|
|
LOG.error("5. Check if any antivirus or security software is blocking the browser")
|
|
raise
|
|
|
|
async def _check_port_with_retry(self, host:str, port:int, max_retries:int = 3, retry_delay:float = 1.0) -> bool:
|
|
"""
|
|
Check if a port is open with retry logic.
|
|
|
|
Args:
|
|
host: Host to check
|
|
port: Port to check
|
|
max_retries: Maximum number of retry attempts
|
|
retry_delay: Delay between retries in seconds
|
|
|
|
Returns:
|
|
True if port is open, False otherwise
|
|
"""
|
|
for attempt in range(max_retries):
|
|
if net.is_port_open(host, port):
|
|
return True
|
|
|
|
if attempt < max_retries - 1:
|
|
LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)",
|
|
host, port, retry_delay, attempt + 1, max_retries)
|
|
await asyncio.sleep(retry_delay)
|
|
|
|
return False
|
|
|
|
def diagnose_browser_issues(self) -> None:
|
|
"""
|
|
Diagnose common browser connection issues and provide troubleshooting information.
|
|
"""
|
|
LOG.info("=== Browser Connection Diagnostics ===")
|
|
|
|
# Check browser binary
|
|
if self.browser_config.binary_location:
|
|
if os.path.exists(self.browser_config.binary_location):
|
|
LOG.info("(ok) Browser binary exists: %s", self.browser_config.binary_location)
|
|
if os.access(self.browser_config.binary_location, os.X_OK):
|
|
LOG.info("(ok) Browser binary is executable")
|
|
else:
|
|
LOG.error("(fail) Browser binary is not executable")
|
|
else:
|
|
LOG.error("(fail) Browser binary not found: %s", self.browser_config.binary_location)
|
|
else:
|
|
browser_path = self.get_compatible_browser()
|
|
if browser_path:
|
|
LOG.info("(ok) Auto-detected browser: %s", browser_path)
|
|
# Set the binary location for Chrome version detection
|
|
self.browser_config.binary_location = browser_path
|
|
else:
|
|
LOG.error("(fail) No compatible browser found")
|
|
|
|
# Check user data directory
|
|
if self.browser_config.user_data_dir:
|
|
if os.path.exists(self.browser_config.user_data_dir):
|
|
LOG.info("(ok) User data directory exists: %s", self.browser_config.user_data_dir)
|
|
if os.access(self.browser_config.user_data_dir, os.R_OK | os.W_OK):
|
|
LOG.info("(ok) User data directory is readable and writable")
|
|
else:
|
|
LOG.error("(fail) User data directory permissions issue")
|
|
else:
|
|
LOG.info("(info) User data directory does not exist (will be created): %s", self.browser_config.user_data_dir)
|
|
|
|
# Check for remote debugging port
|
|
remote_port = 0
|
|
for arg in self.browser_config.arguments:
|
|
if arg.startswith("--remote-debugging-port="):
|
|
remote_port = int(arg.split("=", maxsplit = 1)[1])
|
|
break
|
|
|
|
if remote_port > 0:
|
|
LOG.info("(info) Remote debugging port configured: %d", remote_port)
|
|
if net.is_port_open("127.0.0.1", remote_port):
|
|
LOG.info("(ok) Remote debugging port is open")
|
|
# Try to get more information about the debugging endpoint
|
|
try:
|
|
probe_timeout = self._effective_timeout("chrome_remote_probe")
|
|
response = urllib.request.urlopen(f"http://127.0.0.1:{remote_port}/json/version", timeout = probe_timeout)
|
|
version_info = json.loads(response.read().decode())
|
|
LOG.info("(ok) Remote debugging API accessible - Browser: %s", version_info.get("Browser", "Unknown"))
|
|
except Exception as e:
|
|
LOG.warning("(fail) Remote debugging port is open but API not accessible: %s", str(e))
|
|
LOG.info(" This might indicate a browser update issue or configuration problem")
|
|
else:
|
|
LOG.info("(info) Remote debugging port is not open")
|
|
|
|
# Check for running browser processes
|
|
browser_processes = []
|
|
target_browser_name = ""
|
|
|
|
# Get the target browser name for comparison
|
|
if self.browser_config.binary_location:
|
|
target_browser_name = os.path.basename(self.browser_config.binary_location).lower()
|
|
else:
|
|
try:
|
|
target_browser_path = self.get_compatible_browser()
|
|
target_browser_name = os.path.basename(target_browser_path).lower()
|
|
except (AssertionError, TypeError):
|
|
target_browser_name = ""
|
|
|
|
try:
|
|
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
|
|
try:
|
|
proc_name = proc.info["name"] or ""
|
|
cmdline = proc.info["cmdline"] or []
|
|
|
|
# Check if this is a browser process relevant to our diagnostics
|
|
is_relevant_browser = False
|
|
|
|
# Is this the target browser?
|
|
is_target_browser = target_browser_name and target_browser_name in proc_name.lower()
|
|
|
|
# Does it have remote debugging?
|
|
has_remote_debugging = cmdline and any(arg.startswith("--remote-debugging-port=") for arg in cmdline)
|
|
|
|
# Detect target browser processes for diagnostics
|
|
if is_target_browser:
|
|
is_relevant_browser = True
|
|
# Add debugging status to the process info for better diagnostics
|
|
proc.info["has_remote_debugging"] = has_remote_debugging
|
|
|
|
if is_relevant_browser:
|
|
browser_processes.append(proc.info)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
pass
|
|
except (psutil.Error, PermissionError) as exc:
|
|
LOG.warning("(warn) Unable to inspect browser processes: %s", exc)
|
|
browser_processes = []
|
|
|
|
if browser_processes:
|
|
LOG.info("(info) Found %d browser processes running", len(browser_processes))
|
|
for proc in browser_processes[:3]: # Show first 3
|
|
has_debugging = proc.get("has_remote_debugging", False)
|
|
if has_debugging:
|
|
LOG.info(" - PID %d: %s (remote debugging enabled)", proc["pid"], proc["name"])
|
|
else:
|
|
LOG.warning(" - PID %d: %s (remote debugging NOT enabled)", proc["pid"], proc["name"])
|
|
else:
|
|
LOG.info("(info) No browser processes currently running")
|
|
|
|
if platform.system() == "Linux":
|
|
if _is_admin():
|
|
LOG.error("(fail) Running as root - this can cause browser issues")
|
|
|
|
# Chrome version detection and validation
|
|
self._diagnose_chrome_version_issues(remote_port)
|
|
|
|
LOG.info("=== End Diagnostics ===")
|
|
|
|
def close_browser_session(self) -> None:
|
|
if self.browser:
|
|
LOG.debug("Closing Browser session...")
|
|
self.page = None # pyright: ignore[reportAttributeAccessIssue]
|
|
browser_process = psutil.Process(self.browser._process_pid) # noqa: SLF001 Private member accessed
|
|
browser_children:list[psutil.Process] = browser_process.children()
|
|
self.browser.stop()
|
|
for p in browser_children:
|
|
if p.is_running():
|
|
p.kill() # terminate orphaned browser processes
|
|
self.browser = None # pyright: ignore[reportAttributeAccessIssue]
|
|
|
|
def _cleanup_session_resources(self) -> None:
|
|
"""Clean up any resources that were created during session setup."""
|
|
# Reset browser and page references
|
|
self.browser = None # pyright: ignore[reportAttributeAccessIssue]
|
|
self.page = None # pyright: ignore[reportAttributeAccessIssue]
|
|
|
|
def get_compatible_browser(self) -> str:
|
|
match platform.system():
|
|
case "Linux":
|
|
browser_paths = [
|
|
shutil.which("chromium"),
|
|
shutil.which("chromium-browser"),
|
|
shutil.which("google-chrome"),
|
|
shutil.which("microsoft-edge")
|
|
]
|
|
|
|
case "Darwin":
|
|
browser_paths = [
|
|
"/Applications/Chromium.app/Contents/MacOS/Chromium",
|
|
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
|
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
|
|
]
|
|
|
|
case "Windows":
|
|
browser_paths = [
|
|
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe",
|
|
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe",
|
|
|
|
os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe",
|
|
os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe",
|
|
os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe",
|
|
|
|
os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe",
|
|
os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe",
|
|
os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe",
|
|
|
|
shutil.which("msedge.exe"),
|
|
shutil.which("chromium.exe"),
|
|
shutil.which("chrome.exe")
|
|
]
|
|
|
|
case _ as os_name:
|
|
raise AssertionError(_("Installed browser for OS %s could not be detected") % os_name)
|
|
|
|
for browser_path in browser_paths:
|
|
if browser_path and os.path.isfile(browser_path):
|
|
return browser_path
|
|
|
|
raise AssertionError(_("Installed browser could not be detected"))
|
|
|
|
async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
|
|
timeout:int | float | None = None, timeout_error_message:str = "", apply_multiplier:bool = True) -> T:
|
|
"""
|
|
Blocks/waits until the given condition is met.
|
|
|
|
:param timeout: timeout in seconds (base value, multiplier applied unless disabled)
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
start_at = loop.time()
|
|
base_timeout = timeout if timeout is not None else self._timeout()
|
|
effective_timeout = self._effective_timeout(override = base_timeout) if apply_multiplier else base_timeout
|
|
|
|
while True:
|
|
await self.page
|
|
ex:Exception | None = None
|
|
try:
|
|
result_raw = condition()
|
|
result:T = cast(T, await result_raw if inspect.isawaitable(result_raw) else result_raw)
|
|
if result:
|
|
return result
|
|
except Exception as ex1:
|
|
ex = ex1
|
|
if loop.time() - start_at > effective_timeout:
|
|
if ex:
|
|
raise ex
|
|
raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds")
|
|
await self.page.sleep(0.5)
|
|
|
|
async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool:
|
|
"""
|
|
Locates an HTML element and returns a state.
|
|
|
|
:param timeout: timeout in seconds
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
|
|
def is_disabled(elem:Element) -> bool:
|
|
return elem.attrs.get("disabled") is not None
|
|
|
|
async def is_displayed(elem:Element) -> bool:
|
|
return cast(bool, await elem.apply("""
|
|
function (element) {
|
|
var style = window.getComputedStyle(element);
|
|
return style.display !== 'none'
|
|
&& style.visibility !== 'hidden'
|
|
&& style.opacity !== '0'
|
|
&& element.offsetWidth > 0
|
|
&& element.offsetHeight > 0
|
|
}
|
|
"""))
|
|
|
|
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
|
|
|
|
match attr:
|
|
case Is.CLICKABLE:
|
|
return not is_disabled(elem) or await is_displayed(elem)
|
|
case Is.DISPLAYED:
|
|
return await is_displayed(elem)
|
|
case Is.DISABLED:
|
|
return is_disabled(elem)
|
|
case Is.READONLY:
|
|
return elem.attrs.get("readonly") is not None
|
|
case Is.SELECTED:
|
|
return cast(bool, await elem.apply("""
|
|
function (element) {
|
|
if (element.tagName.toLowerCase() === 'input') {
|
|
if (element.type === 'checkbox' || element.type === 'radio') {
|
|
return element.checked
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
"""))
|
|
raise AssertionError(_("Unsupported attribute: %s") % attr)
|
|
|
|
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element:
|
|
"""
|
|
Locates an HTML element by ID.
|
|
|
|
:param timeout: timeout in seconds
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
elem = await self.web_find(selector_type, selector_value, timeout = timeout)
|
|
await elem.click()
|
|
await self.web_sleep()
|
|
return elem
|
|
|
|
async def web_execute(self, jscode:str) -> Any:
|
|
"""
|
|
Executes the given JavaScript code in the context of the current page.
|
|
|
|
Handles nodriver 0.47+ RemoteObject results by converting them to regular Python objects.
|
|
Uses the RemoteObject API (value, deep_serialized_value) for proper conversion.
|
|
|
|
:param jscode: JavaScript code to execute
|
|
:return: The javascript's return value as a regular Python object
|
|
"""
|
|
# Try to get the result with return_by_value=True first
|
|
result = await self.page.evaluate(jscode, await_promise = True, return_by_value = True)
|
|
|
|
# If we got a RemoteObject, use the proper API to get properties
|
|
if _is_remote_object(result):
|
|
try:
|
|
# Type cast to RemoteObject for type checker
|
|
remote_obj:"RemoteObject" = result
|
|
|
|
# Use the proper RemoteObject API - try to get the value directly first
|
|
if hasattr(remote_obj, "value") and remote_obj.value is not None:
|
|
return remote_obj.value
|
|
|
|
# For complex objects, use deep_serialized_value which contains the actual data
|
|
if hasattr(remote_obj, "deep_serialized_value") and remote_obj.deep_serialized_value:
|
|
value = remote_obj.deep_serialized_value.value
|
|
# Convert the complex nested structure to a proper dictionary
|
|
return self._convert_remote_object_value(value)
|
|
|
|
# Fallback to the original result
|
|
return remote_obj
|
|
except Exception as e:
|
|
LOG.debug("Failed to extract value from RemoteObject: %s", e)
|
|
return result
|
|
|
|
# debug log the jscode but avoid excessive debug logging of window.scrollTo calls
|
|
_prev_jscode:str = getattr(self.__class__.web_execute, "_prev_jscode", "")
|
|
if not (jscode == _prev_jscode or (jscode.startswith("window.scrollTo") and _prev_jscode.startswith("window.scrollTo"))):
|
|
LOG.debug("web_execute(`%s`) = `%s`", jscode, result)
|
|
self.__class__.web_execute._prev_jscode = jscode # type: ignore[attr-defined] # noqa: SLF001 Private member accessed
|
|
|
|
return result
|
|
|
|
def _convert_remote_object_value(self, data:Any) -> Any:
|
|
"""
|
|
Recursively converts RemoteObject values to regular Python objects.
|
|
|
|
Handles the complex nested structure from deep_serialized_value.
|
|
Converts key/value lists to dictionaries and processes type/value structures.
|
|
|
|
:param data: The data to convert (list, dict, or primitive)
|
|
:return: Converted Python object
|
|
"""
|
|
if isinstance(data, list):
|
|
# Check if this is a key/value list format: [["key", "value"], ...]
|
|
if data and isinstance(data[0], list) and len(data[0]) == _KEY_VALUE_PAIR_SIZE:
|
|
# Convert list of [key, value] pairs to dict
|
|
converted_dict = {}
|
|
for item in data:
|
|
if len(item) == _KEY_VALUE_PAIR_SIZE:
|
|
key, value = item
|
|
# Handle nested structures in values
|
|
if isinstance(value, dict) and "type" in value and "value" in value:
|
|
# Extract the actual value from the type/value structure
|
|
converted_dict[key] = self._convert_remote_object_value(value["value"])
|
|
else:
|
|
converted_dict[key] = self._convert_remote_object_value(value)
|
|
return converted_dict
|
|
# Regular list - convert each item
|
|
return [self._convert_remote_object_value(item) for item in data]
|
|
|
|
if isinstance(data, dict):
|
|
# Handle type/value structures: {'type': 'string', 'value': 'actual_value'}
|
|
if "type" in data and "value" in data:
|
|
return self._convert_remote_object_value(data["value"])
|
|
# Regular dict - convert each value
|
|
return {key: self._convert_remote_object_value(value) for key, value in data.items()}
|
|
|
|
# Return primitive values as-is
|
|
return data
|
|
|
|
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> Element:
|
|
"""
|
|
Locates an HTML element by the given selector type and value.
|
|
|
|
:param timeout: timeout in seconds (base value before multiplier/backoff)
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
|
|
async def attempt(effective_timeout:float) -> Element:
|
|
return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent)
|
|
|
|
return await self._run_with_timeout_retries(
|
|
attempt,
|
|
description = f"web_find({selector_type.name}, {selector_value})",
|
|
key = "default",
|
|
override = timeout
|
|
)
|
|
|
|
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]:
|
|
"""
|
|
Locates multiple HTML elements by the given selector type and value.
|
|
|
|
:param timeout: timeout in seconds (base value before multiplier/backoff)
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
|
|
async def attempt(effective_timeout:float) -> list[Element]:
|
|
return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent)
|
|
|
|
return await self._run_with_timeout_retries(
|
|
attempt,
|
|
description = f"web_find_all({selector_type.name}, {selector_value})",
|
|
key = "default",
|
|
override = timeout
|
|
)
|
|
|
|
async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element:
|
|
timeout_suffix = f" within {timeout} seconds."
|
|
|
|
match selector_type:
|
|
case By.ID:
|
|
escaped_id = selector_value.translate(METACHAR_ESCAPER)
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector(f"#{escaped_id}", parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.CLASS_NAME:
|
|
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector(f".{escaped_classname}", parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.TAG_NAME:
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector(selector_value, parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.CSS_SELECTOR:
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector(selector_value, parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.TEXT:
|
|
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
|
return await self.web_await(
|
|
lambda: self.page.find_element_by_text(selector_value, best_match = True),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.XPATH:
|
|
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
|
return await self.web_await(
|
|
lambda: self.page.find_element_by_text(selector_value, best_match = True),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
|
|
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
|
|
|
async def _web_find_all_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> list[Element]:
|
|
timeout_suffix = f" within {timeout} seconds."
|
|
|
|
match selector_type:
|
|
case By.CLASS_NAME:
|
|
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector_all(f".{escaped_classname}", parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.CSS_SELECTOR:
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector_all(selector_value, parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.TAG_NAME:
|
|
return await self.web_await(
|
|
lambda: self.page.query_selector_all(selector_value, parent),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.TEXT:
|
|
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
|
return await self.web_await(
|
|
lambda: self.page.find_elements_by_text(selector_value),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
case By.XPATH:
|
|
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
|
return await self.web_await(
|
|
lambda: self.page.find_elements_by_text(selector_value),
|
|
timeout = timeout,
|
|
timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}",
|
|
apply_multiplier = False)
|
|
|
|
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
|
|
|
async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float | None = None) -> Element:
|
|
"""
|
|
Enters text into an HTML input field.
|
|
|
|
:param timeout: timeout in seconds
|
|
:raises TimeoutError: if element could not be found within time
|
|
"""
|
|
input_field = await self.web_find(selector_type, selector_value, timeout = timeout)
|
|
await input_field.clear_input()
|
|
await input_field.send_keys(str(text))
|
|
await self.web_sleep()
|
|
return input_field
|
|
|
|
async def web_open(self, url:str, *, timeout:int | float | None = None, reload_if_already_open:bool = False) -> None:
|
|
"""
|
|
:param url: url to open in browser
|
|
:param timeout: timespan in seconds within the page needs to be loaded (base value)
|
|
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
|
|
:raises TimeoutException: if page did not open within given timespan
|
|
"""
|
|
LOG.debug(" -> Opening [%s]...", url)
|
|
if not reload_if_already_open and self.page and url == self.page.url:
|
|
LOG.debug(" => skipping, [%s] is already open", url)
|
|
return
|
|
self.page = await self.browser.get(url = url, new_tab = False, new_window = False)
|
|
page_timeout = self._effective_timeout("page_load", timeout)
|
|
await self.web_await(
|
|
lambda: self.web_execute("document.readyState == 'complete'"),
|
|
timeout = page_timeout,
|
|
timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.",
|
|
apply_multiplier = False
|
|
)
|
|
|
|
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str:
|
|
return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
|
|
function (elem) {
|
|
let sel = window.getSelection()
|
|
sel.removeAllRanges()
|
|
let range = document.createRange()
|
|
range.selectNode(elem)
|
|
sel.addRange(range)
|
|
let visibleText = sel.toString().trim()
|
|
sel.removeAllRanges()
|
|
return visibleText
|
|
}
|
|
"""))
|
|
|
|
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
|
|
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
|
|
LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
|
|
" ... pausing for %d ms ...", duration)
|
|
await self.page.sleep(duration / 1_000)
|
|
|
|
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
|
|
headers:dict[str, str] | None = None) -> Any:
|
|
method = method.upper()
|
|
LOG.debug(" -> HTTP %s [%s]...", method, url)
|
|
response = await self.web_execute(f"""
|
|
fetch("{url}", {{
|
|
method: "{method}",
|
|
redirect: "follow",
|
|
headers: {headers or {}}
|
|
}})
|
|
.then(response => response.text().then(responseText => {{
|
|
headers = {{}};
|
|
response.headers.forEach((v, k) => headers[k] = v);
|
|
return {{
|
|
statusCode: response.status,
|
|
statusMessage: response.statusText,
|
|
headers: headers,
|
|
content: responseText
|
|
}}
|
|
}}))
|
|
""")
|
|
if isinstance(valid_response_codes, int):
|
|
valid_response_codes = [valid_response_codes]
|
|
ensure(
|
|
response["statusCode"] in valid_response_codes,
|
|
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
|
|
)
|
|
return response
|
|
# pylint: enable=dangerous-default-value
|
|
|
|
async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None:
|
|
"""
|
|
Smoothly scrolls the current web page down.
|
|
|
|
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
|
:param scroll_speed: the speed of scrolling, higher is faster
|
|
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
|
"""
|
|
current_y_pos = 0
|
|
bottom_y_pos:int = await self.web_execute("document.body.scrollHeight") # get bottom position
|
|
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
|
current_y_pos += scroll_length
|
|
await self.web_execute(f"window.scrollTo(0, {current_y_pos})") # scroll one step
|
|
await asyncio.sleep(scroll_length / scroll_speed)
|
|
|
|
if scroll_back_top: # scroll back to top in same style
|
|
while current_y_pos > 0:
|
|
current_y_pos -= scroll_length
|
|
await self.web_execute(f"window.scrollTo(0, {current_y_pos})")
|
|
await asyncio.sleep(scroll_length / scroll_speed / 2) # double speed
|
|
|
|
async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float | None = None) -> Element:
|
|
"""
|
|
Selects an <option/> of a <select/> HTML element.
|
|
|
|
:param timeout: timeout in seconds
|
|
:raises TimeoutError: if element could not be found within time
|
|
:raises UnexpectedTagNameException: if element is not a <select> element
|
|
"""
|
|
await self.web_await(
|
|
lambda: self.web_check(selector_type, selector_value, Is.CLICKABLE), timeout = timeout,
|
|
timeout_error_message = f"No clickable HTML element with selector: {selector_type}='{selector_value}' found"
|
|
)
|
|
elem = await self.web_find(selector_type, selector_value)
|
|
await elem.apply(f"""
|
|
function (element) {{
|
|
for(let i=0; i < element.options.length; i++)
|
|
{{
|
|
if(element.options[i].value == "{selected_value}") {{
|
|
element.selectedIndex = i;
|
|
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
|
|
break;
|
|
}}
|
|
}}
|
|
throw new Error("Option with value {selected_value} not found.");
|
|
}}
|
|
""")
|
|
await self.web_sleep()
|
|
return elem
|
|
|
|
async def _validate_chrome_version_configuration(self) -> None:
|
|
"""
|
|
Validate Chrome version configuration for Chrome 136+ security requirements.
|
|
|
|
This method checks if the browser is Chrome 136+ and validates that the configuration
|
|
meets the security requirements for remote debugging.
|
|
"""
|
|
# Skip validation in test environments to avoid subprocess calls
|
|
if os.environ.get("PYTEST_CURRENT_TEST"):
|
|
LOG.debug(" -> Skipping browser version validation in test environment")
|
|
return
|
|
|
|
try:
|
|
# Get remote debugging configuration
|
|
remote_host = "127.0.0.1"
|
|
remote_port = 0
|
|
for arg in self.browser_config.arguments:
|
|
if arg.startswith("--remote-debugging-host="):
|
|
remote_host = arg.split("=", maxsplit = 1)[1]
|
|
if arg.startswith("--remote-debugging-port="):
|
|
remote_port = int(arg.split("=", maxsplit = 1)[1])
|
|
|
|
version_info = None
|
|
|
|
# First, try to detect version from existing browser with remote debugging
|
|
if remote_port > 0:
|
|
LOG.debug(" -> Checking for existing browser with remote debugging at %s:%s", remote_host, remote_port)
|
|
# Reuse the same port checking logic as in create_browser_session
|
|
port_available = await self._check_port_with_retry(remote_host, remote_port)
|
|
if port_available:
|
|
try:
|
|
version_info = detect_chrome_version_from_remote_debugging(
|
|
remote_host,
|
|
remote_port,
|
|
timeout = self._effective_timeout("chrome_remote_debugging")
|
|
)
|
|
if version_info:
|
|
LOG.debug(" -> Detected version from existing browser: %s", version_info)
|
|
else:
|
|
LOG.debug(" -> Port is open but remote debugging API not accessible")
|
|
except Exception as e:
|
|
LOG.debug(" -> Failed to detect version from existing browser: %s", e)
|
|
else:
|
|
LOG.debug(" -> No existing browser found at %s:%s", remote_host, remote_port)
|
|
|
|
# Only fall back to binary detection if no remote browser is running
|
|
if not version_info:
|
|
binary_path = self.browser_config.binary_location
|
|
if binary_path:
|
|
LOG.debug(" -> No remote browser detected, trying binary detection")
|
|
version_info = detect_chrome_version_from_binary(
|
|
binary_path,
|
|
timeout = self._effective_timeout("chrome_binary_detection")
|
|
)
|
|
|
|
# Validate if Chrome 136+ detected
|
|
if version_info and version_info.is_chrome_136_plus:
|
|
LOG.info(" -> %s 136+ detected: %s", version_info.browser_name, version_info)
|
|
await self._validate_chrome_136_configuration(version_info)
|
|
elif version_info:
|
|
LOG.info(" -> %s version detected: %s (pre-136, no special validation required)", version_info.browser_name, version_info)
|
|
else:
|
|
LOG.debug(" -> Could not detect browser version, skipping validation")
|
|
except (subprocess.SubprocessError, OSError, FileNotFoundError) as e:
|
|
LOG.warning(" -> Browser version detection failed, skipping validation: %s", e)
|
|
# Continue without validation rather than failing
|
|
except Exception as e:
|
|
LOG.warning(" -> Unexpected error during browser version validation, skipping: %s", e)
|
|
# Continue without validation rather than failing
|
|
|
|
async def _validate_chrome_136_configuration(self, version_info:ChromeVersionInfo) -> None:
|
|
"""
|
|
Validate Chrome 136+ configuration.
|
|
|
|
Chrome/Edge 136+ requires --user-data-dir to be specified for security reasons.
|
|
|
|
Args:
|
|
version_info: Chrome version information
|
|
|
|
Raises:
|
|
AssertionError: If configuration is invalid
|
|
"""
|
|
# Check if user-data-dir is specified in arguments or configuration
|
|
has_user_data_dir_arg = any(
|
|
arg.startswith("--user-data-dir=")
|
|
for arg in self.browser_config.arguments
|
|
)
|
|
has_user_data_dir_config = (
|
|
self.browser_config.user_data_dir is not None and
|
|
self.browser_config.user_data_dir.strip()
|
|
)
|
|
|
|
if not has_user_data_dir_arg and not has_user_data_dir_config:
|
|
error_message = (
|
|
f"{version_info.browser_name} 136+ requires --user-data-dir to be specified. "
|
|
"Add --user-data-dir=/path/to/directory to browser arguments and "
|
|
'user_data_dir: "/path/to/directory" to your configuration.'
|
|
)
|
|
LOG.error(" -> %s 136+ configuration validation failed: %s", version_info.browser_name, error_message)
|
|
raise AssertionError(error_message)
|
|
|
|
LOG.info(" -> %s 136+ configuration validation passed", version_info.browser_name)
|
|
|
|
def _diagnose_chrome_version_issues(self, remote_port:int) -> None:
|
|
"""
|
|
Diagnose Chrome version issues and provide specific recommendations.
|
|
|
|
Args:
|
|
remote_port: Remote debugging port (0 if not configured)
|
|
"""
|
|
# Skip diagnostics in test environments to avoid subprocess calls
|
|
if os.environ.get("PYTEST_CURRENT_TEST"):
|
|
LOG.debug(" -> Skipping browser version diagnostics in test environment")
|
|
return
|
|
|
|
try:
|
|
# Get diagnostic information
|
|
binary_path = self.browser_config.binary_location
|
|
diagnostic_info = get_chrome_version_diagnostic_info(
|
|
binary_path = binary_path,
|
|
remote_host = "127.0.0.1",
|
|
remote_port = remote_port if remote_port > 0 else None,
|
|
remote_timeout = self._effective_timeout("chrome_remote_debugging"),
|
|
binary_timeout = self._effective_timeout("chrome_binary_detection")
|
|
)
|
|
|
|
# Report binary detection results
|
|
if diagnostic_info["binary_detection"]:
|
|
binary_info = diagnostic_info["binary_detection"]
|
|
LOG.info("(info) %s version from binary: %s %s (major: %d)",
|
|
binary_info["browser_name"], binary_info["browser_name"], binary_info["version_string"], binary_info["major_version"])
|
|
|
|
if binary_info["is_chrome_136_plus"]:
|
|
LOG.info("(info) %s 136+ detected - security validation required", binary_info["browser_name"])
|
|
else:
|
|
LOG.info("(info) %s pre-136 detected - no special security requirements", binary_info["browser_name"])
|
|
|
|
# Report remote detection results
|
|
if diagnostic_info["remote_detection"]:
|
|
remote_info = diagnostic_info["remote_detection"]
|
|
LOG.info("(info) %s version from remote debugging: %s %s (major: %d)",
|
|
remote_info["browser_name"], remote_info["browser_name"], remote_info["version_string"], remote_info["major_version"])
|
|
|
|
if remote_info["is_chrome_136_plus"]:
|
|
LOG.info("(info) Remote %s 136+ detected - validating configuration", remote_info["browser_name"])
|
|
|
|
# Validate configuration for Chrome/Edge 136+
|
|
is_valid, error_message = validate_chrome_136_configuration(
|
|
list(self.browser_config.arguments),
|
|
self.browser_config.user_data_dir
|
|
)
|
|
|
|
if not is_valid:
|
|
LOG.error("(fail) %s 136+ configuration validation failed: %s", remote_info["browser_name"], error_message)
|
|
LOG.info(" Solution: Add --user-data-dir=/path/to/directory to browser arguments")
|
|
LOG.info(' And user_data_dir: "/path/to/directory" to your configuration')
|
|
else:
|
|
LOG.info("(ok) %s 136+ configuration validation passed", remote_info["browser_name"])
|
|
|
|
# Add general recommendations
|
|
if diagnostic_info["chrome_136_plus_detected"]:
|
|
LOG.info("(info) Chrome/Edge 136+ security changes require --user-data-dir for remote debugging")
|
|
LOG.info(" See: https://developer.chrome.com/blog/remote-debugging-port")
|
|
except (subprocess.SubprocessError, OSError, FileNotFoundError) as e:
|
|
LOG.warning(" -> Browser version diagnostics failed: %s", e)
|
|
# Continue without diagnostics rather than failing
|
|
except Exception as e:
|
|
LOG.warning(" -> Unexpected error during browser version diagnostics: %s", e)
|
|
# Continue without diagnostics rather than failing
|