# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors # SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404 from collections.abc import Awaitable, Callable, Coroutine, Iterable from gettext import gettext as _ from typing import Any, Final, Optional, cast try: from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy except ImportError: from typing import NoReturn as Never # Python <3.11 import nodriver, psutil # isort: skip from typing import TYPE_CHECKING, TypeGuard from nodriver.core.browser import Browser from nodriver.core.config import Config as NodriverConfig from nodriver.core.element import Element from nodriver.core.tab import Tab as Page from kleinanzeigen_bot.model.config_model import Config as BotConfig from kleinanzeigen_bot.model.config_model import TimeoutConfig from . import files, loggers, net from .chrome_version_detector import ( ChromeVersionInfo, detect_chrome_version_from_binary, detect_chrome_version_from_remote_debugging, get_chrome_version_diagnostic_info, validate_chrome_136_configuration, ) from .misc import T, ensure if TYPE_CHECKING: from nodriver.cdp.runtime import RemoteObject # Constants for RemoteObject conversion _KEY_VALUE_PAIR_SIZE = 2 def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]: """Type guard to check if an object is a RemoteObject.""" return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj)) __all__ = [ "Browser", "BrowserConfig", "By", "Element", "Page", "Is", "WebScrapingMixin", ] LOG:Final[loggers.Logger] = loggers.get_logger(__name__) # see https://api.jquery.com/category/selectors/ METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'}) def _is_admin() -> bool: """Check if the current process is running with admin/root privileges.""" try: if hasattr(os, "geteuid"): result = os.geteuid() == 0 return bool(result) return False except AttributeError: return False class By(enum.Enum): ID = enum.auto() CLASS_NAME = enum.auto() CSS_SELECTOR = enum.auto() TAG_NAME = enum.auto() TEXT = enum.auto() XPATH = enum.auto() class Is(enum.Enum): CLICKABLE = enum.auto() DISPLAYED = enum.auto() DISABLED = enum.auto() READONLY = enum.auto() SELECTED = enum.auto() class BrowserConfig: def __init__(self) -> None: self.arguments:Iterable[str] = [] self.binary_location:str | None = None self.extensions:Iterable[str] = [] self.use_private_window:bool = True self.user_data_dir:str | None = None self.profile_name:str | None = None def _write_initial_prefs(prefs_file:str) -> None: with open(prefs_file, "w", encoding = "UTF-8") as fd: json.dump({ "credentials_enable_service": False, "enable_do_not_track": True, "google": { "services": { "consented_to_sync": False } }, "profile": { "default_content_setting_values": { "popups": 0, "notifications": 2 # 1 = allow, 2 = block browser notifications }, "password_manager_enabled": False }, "signin": { "allowed": False }, "translate_site_blacklist": [ "www.kleinanzeigen.de" ], "devtools": { "preferences": { "currentDockState": '"bottom"' } } }, fd) class WebScrapingMixin: def __init__(self) -> None: self.browser_config:Final[BrowserConfig] = BrowserConfig() self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue] self.page:Page = None # pyright: ignore[reportAttributeAccessIssue] self._default_timeout_config:TimeoutConfig | None = None self.config:BotConfig = cast(BotConfig, None) def _get_timeout_config(self) -> TimeoutConfig: config = getattr(self, "config", None) timeouts:TimeoutConfig | None = None if config is not None: timeouts = cast(Optional[TimeoutConfig], getattr(config, "timeouts", None)) if timeouts is not None: return timeouts if self._default_timeout_config is None: self._default_timeout_config = TimeoutConfig() return self._default_timeout_config def _timeout(self, key:str = "default", override:float | None = None) -> float: """ Return the base timeout (seconds) for a given key without applying multipliers. """ return self._get_timeout_config().resolve(key, override) def _effective_timeout(self, key:str = "default", override:float | None = None, *, attempt:int = 0) -> float: """ Return the effective timeout (seconds) with multiplier/backoff applied. """ return self._get_timeout_config().effective(key, override, attempt = attempt) def _timeout_attempts(self) -> int: cfg = self._get_timeout_config() if not cfg.retry_enabled: return 1 # Always perform the initial attempt plus the configured number of retries. return 1 + cfg.retry_max_attempts async def _run_with_timeout_retries( self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None ) -> T: """ Execute an async callable with retry/backoff handling for TimeoutError. """ attempts = self._timeout_attempts() for attempt in range(attempts): effective_timeout = self._effective_timeout(key, override, attempt = attempt) try: return await operation(effective_timeout) except TimeoutError: if attempt >= attempts - 1: raise LOG.debug( "Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout ) raise TimeoutError(f"{description} failed without executing operation") async def create_browser_session(self) -> None: LOG.info("Creating Browser session...") if self.browser_config.binary_location: ensure(await files.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.") else: self.browser_config.binary_location = self.get_compatible_browser() LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location) # Chrome version detection and validation await self._validate_chrome_version_configuration() ######################################################## # check if an existing browser instance shall be used... ######################################################## remote_host = "127.0.0.1" remote_port = 0 for arg in self.browser_config.arguments: if arg.startswith("--remote-debugging-host="): remote_host = arg.split("=", maxsplit = 1)[1] if arg.startswith("--remote-debugging-port="): remote_port = int(arg.split("=", maxsplit = 1)[1]) if remote_port > 0: LOG.info("Using existing browser process at %s:%s", remote_host, remote_port) # Enhanced port checking with retry logic port_available = await self._check_port_with_retry(remote_host, remote_port) ensure(port_available, f"Browser process not reachable at {remote_host}:{remote_port}. " f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. " f"Make sure the browser is running and the port is not blocked by firewall.") try: cfg = NodriverConfig( browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without ) cfg.host = remote_host cfg.port = remote_port self.browser = await nodriver.start(cfg) # type: ignore[attr-defined] LOG.info("New Browser session is %s", self.browser.websocket_url) return except Exception as e: error_msg = str(e) if "root" in error_msg.lower(): LOG.error("Failed to connect to browser. This error often occurs when:") LOG.error("1. Running as root user (try running as regular user)") LOG.error("2. Browser profile is locked or in use by another process") LOG.error("3. Insufficient permissions to access the browser profile") LOG.error("4. Browser is not properly started with remote debugging enabled") LOG.error("") LOG.error("Troubleshooting steps:") LOG.error("1. Close all browser instances and try again") LOG.error("2. Remove the user_data_dir configuration temporarily") LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", self.browser_config.binary_location, remote_port) LOG.error("4. Check if any antivirus or security software is blocking the connection") raise ######################################################## # configure and initialize new browser instance... ######################################################## # default_browser_args: @ https://github.com/ultrafunkamsterdam/nodriver/blob/main/nodriver/core/config.py # https://peter.sh/experiments/chromium-command-line-switches/ # https://github.com/GoogleChrome/chrome-launcher/blob/main/docs/chrome-flags-for-tools.md browser_args = [ # "--disable-dev-shm-usage", # https://stackoverflow.com/a/50725918/5116073 "--disable-crash-reporter", "--disable-domain-reliability", "--disable-sync", "--no-experiments", "--disable-search-engine-choice-screen", "--disable-features=MediaRouter", "--use-mock-keychain", "--test-type", # https://stackoverflow.com/a/36746675/5116073 # https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping '--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"' ] is_edge = "edge" in self.browser_config.binary_location.lower() if is_edge: os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver if self.browser_config.use_private_window: browser_args.append("-inprivate" if is_edge else "--incognito") if self.browser_config.profile_name: LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name) browser_args.append(f"--profile-directory={self.browser_config.profile_name}") for browser_arg in self.browser_config.arguments: LOG.info(" -> Custom Browser argument: %s", browser_arg) browser_args.append(browser_arg) if not loggers.is_debug(LOG): browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3 if self.browser_config.user_data_dir: LOG.info(" -> Browser user data dir: %s", self.browser_config.user_data_dir) cfg = NodriverConfig( headless = False, browser_executable_path = self.browser_config.binary_location, browser_args = browser_args, user_data_dir = self.browser_config.user_data_dir ) # already logged by nodriver: # LOG.debug("-> Effective browser arguments: \n\t\t%s", "\n\t\t".join(cfg.browser_args)) # Enhanced profile directory handling if cfg.user_data_dir: profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default") os.makedirs(profile_dir, exist_ok = True) prefs_file = os.path.join(profile_dir, "Preferences") if not await files.exists(prefs_file): LOG.info(" -> Setting chrome prefs [%s]...", prefs_file) await asyncio.get_running_loop().run_in_executor(None, _write_initial_prefs, prefs_file) # load extensions for crx_extension in self.browser_config.extensions: LOG.info(" -> Adding Browser extension: [%s]", crx_extension) ensure(await files.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.") cfg.add_extension(crx_extension) try: self.browser = await nodriver.start(cfg) # type: ignore[attr-defined] LOG.info("New Browser session is %s", self.browser.websocket_url) except Exception as e: # Clean up any resources that were created during setup self._cleanup_session_resources() error_msg = str(e) if "root" in error_msg.lower(): LOG.error("Failed to start browser. This error often occurs when:") LOG.error("1. Running as root user (try running as regular user)") LOG.error("2. Browser profile is locked or in use by another process") LOG.error("3. Insufficient permissions to access the browser profile") LOG.error("4. Browser binary is not executable or missing") LOG.error("") LOG.error("Troubleshooting steps:") LOG.error("1. Close all browser instances and try again") LOG.error("2. Remove the user_data_dir configuration temporarily") LOG.error("3. Try running without profile configuration") LOG.error("4. Check browser binary permissions: %s", self.browser_config.binary_location) LOG.error("5. Check if any antivirus or security software is blocking the browser") raise async def _check_port_with_retry(self, host:str, port:int, max_retries:int = 3, retry_delay:float = 1.0) -> bool: """ Check if a port is open with retry logic. Args: host: Host to check port: Port to check max_retries: Maximum number of retry attempts retry_delay: Delay between retries in seconds Returns: True if port is open, False otherwise """ for attempt in range(max_retries): if net.is_port_open(host, port): return True if attempt < max_retries - 1: LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", host, port, retry_delay, attempt + 1, max_retries) await asyncio.sleep(retry_delay) return False def diagnose_browser_issues(self) -> None: """ Diagnose common browser connection issues and provide troubleshooting information. """ LOG.info("=== Browser Connection Diagnostics ===") # Check browser binary if self.browser_config.binary_location: if os.path.exists(self.browser_config.binary_location): LOG.info("(ok) Browser binary exists: %s", self.browser_config.binary_location) if os.access(self.browser_config.binary_location, os.X_OK): LOG.info("(ok) Browser binary is executable") else: LOG.error("(fail) Browser binary is not executable") else: LOG.error("(fail) Browser binary not found: %s", self.browser_config.binary_location) else: browser_path = self.get_compatible_browser() if browser_path: LOG.info("(ok) Auto-detected browser: %s", browser_path) # Set the binary location for Chrome version detection self.browser_config.binary_location = browser_path else: LOG.error("(fail) No compatible browser found") # Check user data directory if self.browser_config.user_data_dir: if os.path.exists(self.browser_config.user_data_dir): LOG.info("(ok) User data directory exists: %s", self.browser_config.user_data_dir) if os.access(self.browser_config.user_data_dir, os.R_OK | os.W_OK): LOG.info("(ok) User data directory is readable and writable") else: LOG.error("(fail) User data directory permissions issue") else: LOG.info("(info) User data directory does not exist (will be created): %s", self.browser_config.user_data_dir) # Check for remote debugging port remote_port = 0 for arg in self.browser_config.arguments: if arg.startswith("--remote-debugging-port="): remote_port = int(arg.split("=", maxsplit = 1)[1]) break if remote_port > 0: LOG.info("(info) Remote debugging port configured: %d", remote_port) if net.is_port_open("127.0.0.1", remote_port): LOG.info("(ok) Remote debugging port is open") # Try to get more information about the debugging endpoint try: probe_timeout = self._effective_timeout("chrome_remote_probe") response = urllib.request.urlopen(f"http://127.0.0.1:{remote_port}/json/version", timeout = probe_timeout) version_info = json.loads(response.read().decode()) LOG.info("(ok) Remote debugging API accessible - Browser: %s", version_info.get("Browser", "Unknown")) except Exception as e: LOG.warning("(fail) Remote debugging port is open but API not accessible: %s", str(e)) LOG.info(" This might indicate a browser update issue or configuration problem") else: LOG.info("(info) Remote debugging port is not open") # Check for running browser processes browser_processes = [] target_browser_name = "" # Get the target browser name for comparison if self.browser_config.binary_location: target_browser_name = os.path.basename(self.browser_config.binary_location).lower() else: try: target_browser_path = self.get_compatible_browser() target_browser_name = os.path.basename(target_browser_path).lower() except (AssertionError, TypeError): target_browser_name = "" try: for proc in psutil.process_iter(["pid", "name", "cmdline"]): try: proc_name = proc.info["name"] or "" cmdline = proc.info["cmdline"] or [] # Check if this is a browser process relevant to our diagnostics is_relevant_browser = False # Is this the target browser? is_target_browser = target_browser_name and target_browser_name in proc_name.lower() # Does it have remote debugging? has_remote_debugging = cmdline and any(arg.startswith("--remote-debugging-port=") for arg in cmdline) # Detect target browser processes for diagnostics if is_target_browser: is_relevant_browser = True # Add debugging status to the process info for better diagnostics proc.info["has_remote_debugging"] = has_remote_debugging if is_relevant_browser: browser_processes.append(proc.info) except (psutil.NoSuchProcess, psutil.AccessDenied): pass except (psutil.Error, PermissionError) as exc: LOG.warning("(warn) Unable to inspect browser processes: %s", exc) browser_processes = [] if browser_processes: LOG.info("(info) Found %d browser processes running", len(browser_processes)) for proc in browser_processes[:3]: # Show first 3 has_debugging = proc.get("has_remote_debugging", False) if has_debugging: LOG.info(" - PID %d: %s (remote debugging enabled)", proc["pid"], proc["name"]) else: LOG.warning(" - PID %d: %s (remote debugging NOT enabled)", proc["pid"], proc["name"]) else: LOG.info("(info) No browser processes currently running") if platform.system() == "Linux": if _is_admin(): LOG.error("(fail) Running as root - this can cause browser issues") # Chrome version detection and validation self._diagnose_chrome_version_issues(remote_port) LOG.info("=== End Diagnostics ===") def close_browser_session(self) -> None: if self.browser: LOG.debug("Closing Browser session...") self.page = None # pyright: ignore[reportAttributeAccessIssue] browser_process = psutil.Process(self.browser._process_pid) # noqa: SLF001 Private member accessed browser_children:list[psutil.Process] = browser_process.children() self.browser.stop() for p in browser_children: if p.is_running(): p.kill() # terminate orphaned browser processes self.browser = None # pyright: ignore[reportAttributeAccessIssue] def _cleanup_session_resources(self) -> None: """Clean up any resources that were created during session setup.""" # Reset browser and page references self.browser = None # pyright: ignore[reportAttributeAccessIssue] self.page = None # pyright: ignore[reportAttributeAccessIssue] def get_compatible_browser(self) -> str: match platform.system(): case "Linux": browser_paths = [ shutil.which("chromium"), shutil.which("chromium-browser"), shutil.which("google-chrome"), shutil.which("microsoft-edge") ] case "Darwin": browser_paths = [ "/Applications/Chromium.app/Contents/MacOS/Chromium", "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", "/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge", ] case "Windows": browser_paths = [ os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe", os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe", os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe", os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe", os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe", os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe", os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe", os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe", shutil.which("msedge.exe"), shutil.which("chromium.exe"), shutil.which("chrome.exe") ] case _ as os_name: raise AssertionError(_("Installed browser for OS %s could not be detected") % os_name) for browser_path in browser_paths: if browser_path and os.path.isfile(browser_path): return browser_path raise AssertionError(_("Installed browser could not be detected")) async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *, timeout:int | float | None = None, timeout_error_message:str = "", apply_multiplier:bool = True) -> T: """ Blocks/waits until the given condition is met. :param timeout: timeout in seconds (base value, multiplier applied unless disabled) :raises TimeoutError: if element could not be found within time """ loop = asyncio.get_running_loop() start_at = loop.time() base_timeout = timeout if timeout is not None else self._timeout() effective_timeout = self._effective_timeout(override = base_timeout) if apply_multiplier else base_timeout while True: await self.page ex:Exception | None = None try: result_raw = condition() result:T = cast(T, await result_raw if inspect.isawaitable(result_raw) else result_raw) if result: return result except Exception as ex1: ex = ex1 if loop.time() - start_at > effective_timeout: if ex: raise ex raise TimeoutError(timeout_error_message or f"Condition not met within {effective_timeout} seconds") await self.page.sleep(0.5) async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float | None = None) -> bool: """ Locates an HTML element and returns a state. :param timeout: timeout in seconds :raises TimeoutError: if element could not be found within time """ def is_disabled(elem:Element) -> bool: return elem.attrs.get("disabled") is not None async def is_displayed(elem:Element) -> bool: return cast(bool, await elem.apply(""" function (element) { var style = window.getComputedStyle(element); return style.display !== 'none' && style.visibility !== 'hidden' && style.opacity !== '0' && element.offsetWidth > 0 && element.offsetHeight > 0 } """)) elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout) match attr: case Is.CLICKABLE: return not is_disabled(elem) or await is_displayed(elem) case Is.DISPLAYED: return await is_displayed(elem) case Is.DISABLED: return is_disabled(elem) case Is.READONLY: return elem.attrs.get("readonly") is not None case Is.SELECTED: return cast(bool, await elem.apply(""" function (element) { if (element.tagName.toLowerCase() === 'input') { if (element.type === 'checkbox' || element.type === 'radio') { return element.checked } } return false } """)) raise AssertionError(_("Unsupported attribute: %s") % attr) async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element: """ Locates an HTML element by ID. :param timeout: timeout in seconds :raises TimeoutError: if element could not be found within time """ elem = await self.web_find(selector_type, selector_value, timeout = timeout) await elem.click() await self.web_sleep() return elem async def web_execute(self, jscode:str) -> Any: """ Executes the given JavaScript code in the context of the current page. Handles nodriver 0.47+ RemoteObject results by converting them to regular Python objects. Uses the RemoteObject API (value, deep_serialized_value) for proper conversion. :param jscode: JavaScript code to execute :return: The javascript's return value as a regular Python object """ # Try to get the result with return_by_value=True first result = await self.page.evaluate(jscode, await_promise = True, return_by_value = True) # If we got a RemoteObject, use the proper API to get properties if _is_remote_object(result): try: # Type cast to RemoteObject for type checker remote_obj:"RemoteObject" = result # Use the proper RemoteObject API - try to get the value directly first if hasattr(remote_obj, "value") and remote_obj.value is not None: return remote_obj.value # For complex objects, use deep_serialized_value which contains the actual data if hasattr(remote_obj, "deep_serialized_value") and remote_obj.deep_serialized_value: value = remote_obj.deep_serialized_value.value # Convert the complex nested structure to a proper dictionary return self._convert_remote_object_value(value) # Fallback to the original result return remote_obj except Exception as e: LOG.debug("Failed to extract value from RemoteObject: %s", e) return result # debug log the jscode but avoid excessive debug logging of window.scrollTo calls _prev_jscode:str = getattr(self.__class__.web_execute, "_prev_jscode", "") if not (jscode == _prev_jscode or (jscode.startswith("window.scrollTo") and _prev_jscode.startswith("window.scrollTo"))): LOG.debug("web_execute(`%s`) = `%s`", jscode, result) self.__class__.web_execute._prev_jscode = jscode # type: ignore[attr-defined] # noqa: SLF001 Private member accessed return result def _convert_remote_object_value(self, data:Any) -> Any: """ Recursively converts RemoteObject values to regular Python objects. Handles the complex nested structure from deep_serialized_value. Converts key/value lists to dictionaries and processes type/value structures. :param data: The data to convert (list, dict, or primitive) :return: Converted Python object """ if isinstance(data, list): # Check if this is a key/value list format: [["key", "value"], ...] if data and isinstance(data[0], list) and len(data[0]) == _KEY_VALUE_PAIR_SIZE: # Convert list of [key, value] pairs to dict converted_dict = {} for item in data: if len(item) == _KEY_VALUE_PAIR_SIZE: key, value = item # Handle nested structures in values if isinstance(value, dict) and "type" in value and "value" in value: # Extract the actual value from the type/value structure converted_dict[key] = self._convert_remote_object_value(value["value"]) else: converted_dict[key] = self._convert_remote_object_value(value) return converted_dict # Regular list - convert each item return [self._convert_remote_object_value(item) for item in data] if isinstance(data, dict): # Handle type/value structures: {'type': 'string', 'value': 'actual_value'} if "type" in data and "value" in data: return self._convert_remote_object_value(data["value"]) # Regular dict - convert each value return {key: self._convert_remote_object_value(value) for key, value in data.items()} # Return primitive values as-is return data async def web_find(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> Element: """ Locates an HTML element by the given selector type and value. :param timeout: timeout in seconds (base value before multiplier/backoff) :raises TimeoutError: if element could not be found within time """ async def attempt(effective_timeout:float) -> Element: return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent) return await self._run_with_timeout_retries( attempt, description = f"web_find({selector_type.name}, {selector_value})", key = "default", override = timeout ) async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]: """ Locates multiple HTML elements by the given selector type and value. :param timeout: timeout in seconds (base value before multiplier/backoff) :raises TimeoutError: if element could not be found within time """ async def attempt(effective_timeout:float) -> list[Element]: return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent) return await self._run_with_timeout_retries( attempt, description = f"web_find_all({selector_type.name}, {selector_value})", key = "default", override = timeout ) async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element: timeout_suffix = f" within {timeout} seconds." match selector_type: case By.ID: escaped_id = selector_value.translate(METACHAR_ESCAPER) return await self.web_await( lambda: self.page.query_selector(f"#{escaped_id}", parent), timeout = timeout, timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.CLASS_NAME: escaped_classname = selector_value.translate(METACHAR_ESCAPER) return await self.web_await( lambda: self.page.query_selector(f".{escaped_classname}", parent), timeout = timeout, timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.TAG_NAME: return await self.web_await( lambda: self.page.query_selector(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}", apply_multiplier = False) case By.CSS_SELECTOR: return await self.web_await( lambda: self.page.query_selector(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.TEXT: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_element_by_text(selector_value, best_match = True), timeout = timeout, timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.XPATH: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_element_by_text(selector_value, best_match = True), timeout = timeout, timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}", apply_multiplier = False) raise AssertionError(_("Unsupported selector type: %s") % selector_type) async def _web_find_all_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> list[Element]: timeout_suffix = f" within {timeout} seconds." match selector_type: case By.CLASS_NAME: escaped_classname = selector_value.translate(METACHAR_ESCAPER) return await self.web_await( lambda: self.page.query_selector_all(f".{escaped_classname}", parent), timeout = timeout, timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.CSS_SELECTOR: return await self.web_await( lambda: self.page.query_selector_all(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.TAG_NAME: return await self.web_await( lambda: self.page.query_selector_all(selector_value, parent), timeout = timeout, timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}", apply_multiplier = False) case By.TEXT: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_elements_by_text(selector_value), timeout = timeout, timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}", apply_multiplier = False) case By.XPATH: ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}") return await self.web_await( lambda: self.page.find_elements_by_text(selector_value), timeout = timeout, timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}", apply_multiplier = False) raise AssertionError(_("Unsupported selector type: %s") % selector_type) async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float | None = None) -> Element: """ Enters text into an HTML input field. :param timeout: timeout in seconds :raises TimeoutError: if element could not be found within time """ input_field = await self.web_find(selector_type, selector_value, timeout = timeout) await input_field.clear_input() await input_field.send_keys(str(text)) await self.web_sleep() return input_field async def web_open(self, url:str, *, timeout:int | float | None = None, reload_if_already_open:bool = False) -> None: """ :param url: url to open in browser :param timeout: timespan in seconds within the page needs to be loaded (base value) :param reload_if_already_open: if False does nothing if the URL is already open in the browser :raises TimeoutException: if page did not open within given timespan """ LOG.debug(" -> Opening [%s]...", url) if not reload_if_already_open and self.page and url == self.page.url: LOG.debug(" => skipping, [%s] is already open", url) return self.page = await self.browser.get(url = url, new_tab = False, new_window = False) page_timeout = self._effective_timeout("page_load", timeout) await self.web_await( lambda: self.web_execute("document.readyState == 'complete'"), timeout = page_timeout, timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.", apply_multiplier = False ) async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str: return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply(""" function (elem) { let sel = window.getSelection() sel.removeAllRanges() let range = document.createRange() range.selectNode(elem) sel.addRange(range) let visibleText = sel.toString().trim() sel.removeAllRanges() return visibleText } """)) async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None: duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison " ... pausing for %d ms ...", duration) await self.page.sleep(duration / 1_000) async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, headers:dict[str, str] | None = None) -> Any: method = method.upper() LOG.debug(" -> HTTP %s [%s]...", method, url) response = await self.web_execute(f""" fetch("{url}", {{ method: "{method}", redirect: "follow", headers: {headers or {}} }}) .then(response => response.text().then(responseText => {{ headers = {{}}; response.headers.forEach((v, k) => headers[k] = v); return {{ statusCode: response.status, statusMessage: response.statusText, headers: headers, content: responseText }} }})) """) if isinstance(valid_response_codes, int): valid_response_codes = [valid_response_codes] ensure( response["statusCode"] in valid_response_codes, f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}' ) return response # pylint: enable=dangerous-default-value async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None: """ Smoothly scrolls the current web page down. :param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother :param scroll_speed: the speed of scrolling, higher is faster :param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom """ current_y_pos = 0 bottom_y_pos:int = await self.web_execute("document.body.scrollHeight") # get bottom position while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached current_y_pos += scroll_length await self.web_execute(f"window.scrollTo(0, {current_y_pos})") # scroll one step await asyncio.sleep(scroll_length / scroll_speed) if scroll_back_top: # scroll back to top in same style while current_y_pos > 0: current_y_pos -= scroll_length await self.web_execute(f"window.scrollTo(0, {current_y_pos})") await asyncio.sleep(scroll_length / scroll_speed / 2) # double speed async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float | None = None) -> Element: """ Selects an