mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 10:31:50 +01:00
feat: add browser profile XDG support and documentation (#777)
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
import asyncio, enum, inspect, json, os, platform, secrets, shutil, subprocess, urllib.request # isort: skip # noqa: S404
|
||||
from collections.abc import Awaitable, Callable, Coroutine, Iterable
|
||||
from gettext import gettext as _
|
||||
from pathlib import Path
|
||||
from typing import Any, Final, Optional, cast
|
||||
|
||||
try:
|
||||
@@ -22,7 +23,7 @@ from nodriver.core.tab import Tab as Page
|
||||
from kleinanzeigen_bot.model.config_model import Config as BotConfig
|
||||
from kleinanzeigen_bot.model.config_model import TimeoutConfig
|
||||
|
||||
from . import files, loggers, net
|
||||
from . import files, loggers, net, xdg_paths
|
||||
from .chrome_version_detector import (
|
||||
ChromeVersionInfo,
|
||||
detect_chrome_version_from_binary,
|
||||
@@ -40,6 +41,28 @@ if TYPE_CHECKING:
|
||||
_KEY_VALUE_PAIR_SIZE = 2
|
||||
|
||||
|
||||
def _resolve_user_data_dir_paths(arg_value:str, config_value:str) -> tuple[Any, Any]:
|
||||
"""Resolve the argument and config user_data_dir paths for comparison."""
|
||||
try:
|
||||
return (
|
||||
Path(arg_value).expanduser().resolve(),
|
||||
Path(config_value).expanduser().resolve(),
|
||||
)
|
||||
except OSError as exc:
|
||||
LOG.debug("Failed to resolve user_data_dir paths for comparison: %s", exc)
|
||||
return None, None
|
||||
|
||||
|
||||
def _has_non_empty_user_data_dir_arg(args:Iterable[str]) -> bool:
|
||||
for arg in args:
|
||||
if not arg.startswith("--user-data-dir="):
|
||||
continue
|
||||
raw = arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
|
||||
if raw:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _is_remote_object(obj:Any) -> TypeGuard["RemoteObject"]:
|
||||
"""Type guard to check if an object is a RemoteObject."""
|
||||
return hasattr(obj, "__class__") and "RemoteObject" in str(type(obj))
|
||||
@@ -58,7 +81,7 @@ __all__ = [
|
||||
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
|
||||
# see https://api.jquery.com/category/selectors/
|
||||
METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'})
|
||||
METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f"\\{ch}" for ch in "!\"#$%&'()*+,./:;<=>?@[\\]^`{|}~"})
|
||||
|
||||
|
||||
def _is_admin() -> bool:
|
||||
@@ -90,7 +113,6 @@ class Is(enum.Enum):
|
||||
|
||||
|
||||
class BrowserConfig:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.arguments:Iterable[str] = []
|
||||
self.binary_location:str | None = None
|
||||
@@ -102,37 +124,27 @@ class BrowserConfig:
|
||||
|
||||
def _write_initial_prefs(prefs_file:str) -> None:
|
||||
with open(prefs_file, "w", encoding = "UTF-8") as fd:
|
||||
json.dump({
|
||||
"credentials_enable_service": False,
|
||||
"enable_do_not_track": True,
|
||||
"google": {
|
||||
"services": {
|
||||
"consented_to_sync": False
|
||||
}
|
||||
},
|
||||
"profile": {
|
||||
"default_content_setting_values": {
|
||||
"popups": 0,
|
||||
"notifications": 2 # 1 = allow, 2 = block browser notifications
|
||||
json.dump(
|
||||
{
|
||||
"credentials_enable_service": False,
|
||||
"enable_do_not_track": True,
|
||||
"google": {"services": {"consented_to_sync": False}},
|
||||
"profile": {
|
||||
"default_content_setting_values": {
|
||||
"popups": 0,
|
||||
"notifications": 2, # 1 = allow, 2 = block browser notifications
|
||||
},
|
||||
"password_manager_enabled": False,
|
||||
},
|
||||
"password_manager_enabled": False
|
||||
"signin": {"allowed": False},
|
||||
"translate_site_blacklist": ["www.kleinanzeigen.de"],
|
||||
"devtools": {"preferences": {"currentDockState": '"bottom"'}},
|
||||
},
|
||||
"signin": {
|
||||
"allowed": False
|
||||
},
|
||||
"translate_site_blacklist": [
|
||||
"www.kleinanzeigen.de"
|
||||
],
|
||||
"devtools": {
|
||||
"preferences": {
|
||||
"currentDockState": '"bottom"'
|
||||
}
|
||||
}
|
||||
}, fd)
|
||||
fd,
|
||||
)
|
||||
|
||||
|
||||
class WebScrapingMixin:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.browser_config:Final[BrowserConfig] = BrowserConfig()
|
||||
self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue]
|
||||
@@ -140,6 +152,11 @@ class WebScrapingMixin:
|
||||
self._default_timeout_config:TimeoutConfig | None = None
|
||||
self.config:BotConfig = cast(BotConfig, None)
|
||||
|
||||
@property
|
||||
def _installation_mode(self) -> str:
|
||||
"""Get installation mode with fallback to portable."""
|
||||
return getattr(self, "installation_mode_or_portable", "portable")
|
||||
|
||||
def _get_timeout_config(self) -> TimeoutConfig:
|
||||
config = getattr(self, "config", None)
|
||||
timeouts:TimeoutConfig | None = None
|
||||
@@ -172,12 +189,7 @@ class WebScrapingMixin:
|
||||
return 1 + cfg.retry_max_attempts
|
||||
|
||||
async def _run_with_timeout_retries(
|
||||
self,
|
||||
operation:Callable[[float], Awaitable[T]],
|
||||
*,
|
||||
description:str,
|
||||
key:str = "default",
|
||||
override:float | None = None
|
||||
self, operation:Callable[[float], Awaitable[T]], *, description:str, key:str = "default", override:float | None = None
|
||||
) -> T:
|
||||
"""
|
||||
Execute an async callable with retry/backoff handling for TimeoutError.
|
||||
@@ -191,13 +203,7 @@ class WebScrapingMixin:
|
||||
except TimeoutError:
|
||||
if attempt >= attempts - 1:
|
||||
raise
|
||||
LOG.debug(
|
||||
"Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)",
|
||||
description,
|
||||
attempt + 1,
|
||||
attempts,
|
||||
effective_timeout
|
||||
)
|
||||
LOG.debug("Retrying %s after TimeoutError (attempt %d/%d, timeout %.1fs)", description, attempt + 1, attempts, effective_timeout)
|
||||
|
||||
raise TimeoutError(f"{description} failed without executing operation")
|
||||
|
||||
@@ -210,8 +216,25 @@ class WebScrapingMixin:
|
||||
self.browser_config.binary_location = self.get_compatible_browser()
|
||||
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
|
||||
|
||||
has_remote_debugging = any(arg.startswith("--remote-debugging-port=") for arg in self.browser_config.arguments)
|
||||
is_test_environment = bool(os.environ.get("PYTEST_CURRENT_TEST"))
|
||||
|
||||
if (
|
||||
not (self.browser_config.user_data_dir and self.browser_config.user_data_dir.strip())
|
||||
and not _has_non_empty_user_data_dir_arg(self.browser_config.arguments)
|
||||
and not has_remote_debugging
|
||||
and not is_test_environment
|
||||
):
|
||||
self.browser_config.user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode))
|
||||
|
||||
# Chrome version detection and validation
|
||||
await self._validate_chrome_version_configuration()
|
||||
if has_remote_debugging:
|
||||
try:
|
||||
await self._validate_chrome_version_configuration()
|
||||
except AssertionError as exc:
|
||||
LOG.warning(_("Remote debugging detected, but browser configuration looks invalid: %s"), exc)
|
||||
else:
|
||||
await self._validate_chrome_version_configuration()
|
||||
|
||||
########################################################
|
||||
# check if an existing browser instance shall be used...
|
||||
@@ -229,10 +252,12 @@ class WebScrapingMixin:
|
||||
|
||||
# Enhanced port checking with retry logic
|
||||
port_available = await self._check_port_with_retry(remote_host, remote_port)
|
||||
ensure(port_available,
|
||||
ensure(
|
||||
port_available,
|
||||
f"Browser process not reachable at {remote_host}:{remote_port}. "
|
||||
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml. "
|
||||
f"Make sure the browser is running and the port is not blocked by firewall.")
|
||||
f"Make sure the browser is running and the port is not blocked by firewall.",
|
||||
)
|
||||
|
||||
try:
|
||||
cfg = NodriverConfig(
|
||||
@@ -255,8 +280,7 @@ class WebScrapingMixin:
|
||||
LOG.error("Troubleshooting steps:")
|
||||
LOG.error("1. Close all browser instances and try again")
|
||||
LOG.error("2. Remove the user_data_dir configuration temporarily")
|
||||
LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d",
|
||||
self.browser_config.binary_location, remote_port)
|
||||
LOG.error("3. Start browser manually with: %s --remote-debugging-port=%d", self.browser_config.binary_location, remote_port)
|
||||
LOG.error("4. Check if any antivirus or security software is blocking the connection")
|
||||
raise
|
||||
|
||||
@@ -274,13 +298,11 @@ class WebScrapingMixin:
|
||||
"--disable-sync",
|
||||
"--no-experiments",
|
||||
"--disable-search-engine-choice-screen",
|
||||
|
||||
"--disable-features=MediaRouter",
|
||||
"--use-mock-keychain",
|
||||
|
||||
"--test-type", # https://stackoverflow.com/a/36746675/5116073
|
||||
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
|
||||
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"'
|
||||
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"',
|
||||
]
|
||||
|
||||
is_edge = "edge" in self.browser_config.binary_location.lower()
|
||||
@@ -295,10 +317,36 @@ class WebScrapingMixin:
|
||||
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
|
||||
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
|
||||
|
||||
user_data_dir_from_args:str | None = None
|
||||
for browser_arg in self.browser_config.arguments:
|
||||
LOG.info(" -> Custom Browser argument: %s", browser_arg)
|
||||
if browser_arg.startswith("--user-data-dir="):
|
||||
raw = browser_arg.split("=", maxsplit = 1)[1].strip().strip('"').strip("'")
|
||||
if not raw:
|
||||
LOG.warning(_("Ignoring empty --user-data-dir= argument; falling back to configured user_data_dir."))
|
||||
continue
|
||||
user_data_dir_from_args = raw
|
||||
continue
|
||||
browser_args.append(browser_arg)
|
||||
|
||||
effective_user_data_dir = user_data_dir_from_args or self.browser_config.user_data_dir
|
||||
if user_data_dir_from_args and self.browser_config.user_data_dir:
|
||||
arg_path, cfg_path = await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
_resolve_user_data_dir_paths,
|
||||
user_data_dir_from_args,
|
||||
self.browser_config.user_data_dir,
|
||||
)
|
||||
if arg_path is None or cfg_path is None or arg_path != cfg_path:
|
||||
LOG.warning(
|
||||
_("Configured browser.user_data_dir (%s) does not match --user-data-dir argument (%s); using the argument value."),
|
||||
self.browser_config.user_data_dir,
|
||||
user_data_dir_from_args,
|
||||
)
|
||||
if not effective_user_data_dir and not is_test_environment:
|
||||
effective_user_data_dir = str(xdg_paths.get_browser_profile_path(self._installation_mode))
|
||||
self.browser_config.user_data_dir = effective_user_data_dir
|
||||
|
||||
if not loggers.is_debug(LOG):
|
||||
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
|
||||
|
||||
@@ -309,7 +357,7 @@ class WebScrapingMixin:
|
||||
headless = False,
|
||||
browser_executable_path = self.browser_config.binary_location,
|
||||
browser_args = browser_args,
|
||||
user_data_dir = self.browser_config.user_data_dir
|
||||
user_data_dir = self.browser_config.user_data_dir,
|
||||
)
|
||||
|
||||
# already logged by nodriver:
|
||||
@@ -371,8 +419,7 @@ class WebScrapingMixin:
|
||||
return True
|
||||
|
||||
if attempt < max_retries - 1:
|
||||
LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)",
|
||||
host, port, retry_delay, attempt + 1, max_retries)
|
||||
LOG.debug("Port %s:%s not available, retrying in %.1f seconds (attempt %d/%d)", host, port, retry_delay, attempt + 1, max_retries)
|
||||
await asyncio.sleep(retry_delay)
|
||||
|
||||
return False
|
||||
@@ -522,12 +569,7 @@ class WebScrapingMixin:
|
||||
browser_paths:list[str | None] = []
|
||||
match platform.system():
|
||||
case "Linux":
|
||||
browser_paths = [
|
||||
shutil.which("chromium"),
|
||||
shutil.which("chromium-browser"),
|
||||
shutil.which("google-chrome"),
|
||||
shutil.which("microsoft-edge")
|
||||
]
|
||||
browser_paths = [shutil.which("chromium"), shutil.which("chromium-browser"), shutil.which("google-chrome"), shutil.which("microsoft-edge")]
|
||||
|
||||
case "Darwin":
|
||||
browser_paths = [
|
||||
@@ -540,18 +582,15 @@ class WebScrapingMixin:
|
||||
browser_paths = [
|
||||
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r"\Microsoft\Edge\Application\msedge.exe",
|
||||
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r"\Microsoft\Edge\Application\msedge.exe",
|
||||
|
||||
os.environ["PROGRAMFILES"] + r"\Chromium\Application\chrome.exe",
|
||||
os.environ["PROGRAMFILES(X86)"] + r"\Chromium\Application\chrome.exe",
|
||||
os.environ["LOCALAPPDATA"] + r"\Chromium\Application\chrome.exe",
|
||||
|
||||
os.environ["PROGRAMFILES"] + r"\Chrome\Application\chrome.exe",
|
||||
os.environ["PROGRAMFILES(X86)"] + r"\Chrome\Application\chrome.exe",
|
||||
os.environ["LOCALAPPDATA"] + r"\Chrome\Application\chrome.exe",
|
||||
|
||||
shutil.which("msedge.exe"),
|
||||
shutil.which("chromium.exe"),
|
||||
shutil.which("chrome.exe")
|
||||
shutil.which("chrome.exe"),
|
||||
]
|
||||
|
||||
case _ as os_name:
|
||||
@@ -563,8 +602,14 @@ class WebScrapingMixin:
|
||||
|
||||
raise AssertionError(_("Installed browser could not be detected"))
|
||||
|
||||
async def web_await(self, condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
|
||||
timeout:int | float | None = None, timeout_error_message:str = "", apply_multiplier:bool = True) -> T:
|
||||
async def web_await(
|
||||
self,
|
||||
condition:Callable[[], T | Never | Coroutine[Any, Any, T | Never]],
|
||||
*,
|
||||
timeout:int | float | None = None,
|
||||
timeout_error_message:str = "",
|
||||
apply_multiplier:bool = True,
|
||||
) -> T:
|
||||
"""
|
||||
Blocks/waits until the given condition is met.
|
||||
|
||||
@@ -604,7 +649,9 @@ class WebScrapingMixin:
|
||||
return elem.attrs.get("disabled") is not None
|
||||
|
||||
async def is_displayed(elem:Element) -> bool:
|
||||
return cast(bool, await elem.apply("""
|
||||
return cast(
|
||||
bool,
|
||||
await elem.apply("""
|
||||
function (element) {
|
||||
var style = window.getComputedStyle(element);
|
||||
return style.display !== 'none'
|
||||
@@ -613,7 +660,8 @@ class WebScrapingMixin:
|
||||
&& element.offsetWidth > 0
|
||||
&& element.offsetHeight > 0
|
||||
}
|
||||
"""))
|
||||
"""),
|
||||
)
|
||||
|
||||
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
|
||||
@@ -627,7 +675,9 @@ class WebScrapingMixin:
|
||||
case Is.READONLY:
|
||||
return elem.attrs.get("readonly") is not None
|
||||
case Is.SELECTED:
|
||||
return cast(bool, await elem.apply("""
|
||||
return cast(
|
||||
bool,
|
||||
await elem.apply("""
|
||||
function (element) {
|
||||
if (element.tagName.toLowerCase() === 'input') {
|
||||
if (element.type === 'checkbox' || element.type === 'radio') {
|
||||
@@ -636,7 +686,8 @@ class WebScrapingMixin:
|
||||
}
|
||||
return false
|
||||
}
|
||||
"""))
|
||||
"""),
|
||||
)
|
||||
raise AssertionError(_("Unsupported attribute: %s") % attr)
|
||||
|
||||
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float | None = None) -> Element:
|
||||
@@ -743,11 +794,8 @@ class WebScrapingMixin:
|
||||
async def attempt(effective_timeout:float) -> Element:
|
||||
return await self._web_find_once(selector_type, selector_value, effective_timeout, parent = parent)
|
||||
|
||||
return await self._run_with_timeout_retries(
|
||||
attempt,
|
||||
description = f"web_find({selector_type.name}, {selector_value})",
|
||||
key = "default",
|
||||
override = timeout
|
||||
return await self._run_with_timeout_retries( # noqa: E501
|
||||
attempt, description = f"web_find({selector_type.name}, {selector_value})", key = "default", override = timeout
|
||||
)
|
||||
|
||||
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> list[Element]:
|
||||
@@ -762,10 +810,7 @@ class WebScrapingMixin:
|
||||
return await self._web_find_all_once(selector_type, selector_value, effective_timeout, parent = parent)
|
||||
|
||||
return await self._run_with_timeout_retries(
|
||||
attempt,
|
||||
description = f"web_find_all({selector_type.name}, {selector_value})",
|
||||
key = "default",
|
||||
override = timeout
|
||||
attempt, description = f"web_find_all({selector_type.name}, {selector_value})", key = "default", override = timeout
|
||||
)
|
||||
|
||||
async def _web_find_once(self, selector_type:By, selector_value:str, timeout:float, *, parent:Element | None = None) -> Element:
|
||||
@@ -778,40 +823,46 @@ class WebScrapingMixin:
|
||||
lambda: self.page.query_selector(f"#{escaped_id}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with ID '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.CLASS_NAME:
|
||||
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(f".{escaped_classname}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with CSS class '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found of tag <{selector_value}>{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.TEXT:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, best_match = True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found containing text '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.XPATH:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, best_match = True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using XPath '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
@@ -825,33 +876,38 @@ class WebScrapingMixin:
|
||||
lambda: self.page.query_selector_all(f".{escaped_classname}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found of tag <{selector_value}>{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.TEXT:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found containing text '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
case By.XPATH:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using XPath '{selector_value}'{timeout_suffix}",
|
||||
apply_multiplier = False)
|
||||
apply_multiplier = False,
|
||||
)
|
||||
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
@@ -885,11 +941,12 @@ class WebScrapingMixin:
|
||||
lambda: self.web_execute("document.readyState == 'complete'"),
|
||||
timeout = page_timeout,
|
||||
timeout_error_message = f"Page did not finish loading within {page_timeout} seconds.",
|
||||
apply_multiplier = False
|
||||
apply_multiplier = False,
|
||||
)
|
||||
|
||||
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float | None = None) -> str:
|
||||
return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
|
||||
return str(
|
||||
await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
|
||||
function (elem) {
|
||||
let sel = window.getSelection()
|
||||
sel.removeAllRanges()
|
||||
@@ -900,16 +957,19 @@ class WebScrapingMixin:
|
||||
sel.removeAllRanges()
|
||||
return visibleText
|
||||
}
|
||||
"""))
|
||||
""")
|
||||
)
|
||||
|
||||
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
|
||||
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
|
||||
LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
|
||||
" ... pausing for %d ms ...", duration)
|
||||
LOG.log(
|
||||
loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
|
||||
" ... pausing for %d ms ...",
|
||||
duration,
|
||||
)
|
||||
await self.page.sleep(duration / 1_000)
|
||||
|
||||
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
|
||||
headers:dict[str, str] | None = None) -> Any:
|
||||
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, headers:dict[str, str] | None = None) -> Any:
|
||||
method = method.upper()
|
||||
LOG.debug(" -> HTTP %s [%s]...", method, url)
|
||||
response = await self.web_execute(f"""
|
||||
@@ -933,9 +993,10 @@ class WebScrapingMixin:
|
||||
valid_response_codes = [valid_response_codes]
|
||||
ensure(
|
||||
response["statusCode"] in valid_response_codes,
|
||||
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
|
||||
f'Invalid response "{response["statusCode"]} {response["statusMessage"]}" received for HTTP {method} to {url}',
|
||||
)
|
||||
return response
|
||||
|
||||
# pylint: enable=dangerous-default-value
|
||||
|
||||
async def web_scroll_page_down(self, scroll_length:int = 10, scroll_speed:int = 10_000, *, scroll_back_top:bool = False) -> None:
|
||||
@@ -968,8 +1029,9 @@ class WebScrapingMixin:
|
||||
:raises UnexpectedTagNameException: if element is not a <select> element
|
||||
"""
|
||||
await self.web_await(
|
||||
lambda: self.web_check(selector_type, selector_value, Is.CLICKABLE), timeout = timeout,
|
||||
timeout_error_message = f"No clickable HTML element with selector: {selector_type}='{selector_value}' found"
|
||||
lambda: self.web_check(selector_type, selector_value, Is.CLICKABLE),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No clickable HTML element with selector: {selector_type}='{selector_value}' found",
|
||||
)
|
||||
elem = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
|
||||
@@ -1107,9 +1169,7 @@ class WebScrapingMixin:
|
||||
if port_available:
|
||||
try:
|
||||
version_info = detect_chrome_version_from_remote_debugging(
|
||||
remote_host,
|
||||
remote_port,
|
||||
timeout = self._effective_timeout("chrome_remote_debugging")
|
||||
remote_host, remote_port, timeout = self._effective_timeout("chrome_remote_debugging")
|
||||
)
|
||||
if version_info:
|
||||
LOG.debug(" -> Detected version from existing browser: %s", version_info)
|
||||
@@ -1125,10 +1185,7 @@ class WebScrapingMixin:
|
||||
binary_path = self.browser_config.binary_location
|
||||
if binary_path:
|
||||
LOG.debug(" -> No remote browser detected, trying binary detection")
|
||||
version_info = detect_chrome_version_from_binary(
|
||||
binary_path,
|
||||
timeout = self._effective_timeout("chrome_binary_detection")
|
||||
)
|
||||
version_info = detect_chrome_version_from_binary(binary_path, timeout = self._effective_timeout("chrome_binary_detection"))
|
||||
|
||||
# Validate if Chrome 136+ detected
|
||||
if version_info and version_info.is_chrome_136_plus:
|
||||
@@ -1158,14 +1215,8 @@ class WebScrapingMixin:
|
||||
AssertionError: If configuration is invalid
|
||||
"""
|
||||
# Check if user-data-dir is specified in arguments or configuration
|
||||
has_user_data_dir_arg = any(
|
||||
arg.startswith("--user-data-dir=")
|
||||
for arg in self.browser_config.arguments
|
||||
)
|
||||
has_user_data_dir_config = (
|
||||
self.browser_config.user_data_dir is not None and
|
||||
self.browser_config.user_data_dir.strip()
|
||||
)
|
||||
has_user_data_dir_arg = any(arg.startswith("--user-data-dir=") for arg in self.browser_config.arguments)
|
||||
has_user_data_dir_config = self.browser_config.user_data_dir is not None and bool(self.browser_config.user_data_dir.strip())
|
||||
|
||||
if not has_user_data_dir_arg and not has_user_data_dir_config:
|
||||
error_message = (
|
||||
@@ -1198,14 +1249,18 @@ class WebScrapingMixin:
|
||||
remote_host = "127.0.0.1",
|
||||
remote_port = remote_port if remote_port > 0 else None,
|
||||
remote_timeout = self._effective_timeout("chrome_remote_debugging"),
|
||||
binary_timeout = self._effective_timeout("chrome_binary_detection")
|
||||
binary_timeout = self._effective_timeout("chrome_binary_detection"),
|
||||
)
|
||||
|
||||
# Report binary detection results
|
||||
if diagnostic_info["binary_detection"]:
|
||||
binary_info = diagnostic_info["binary_detection"]
|
||||
LOG.info("(info) %s version from binary: %s %s (major: %d)",
|
||||
binary_info["browser_name"], binary_info["browser_name"], binary_info["version_string"], binary_info["major_version"])
|
||||
LOG.info(
|
||||
"(info) %s version from binary: %s (major: %d)",
|
||||
binary_info["browser_name"],
|
||||
binary_info["version_string"],
|
||||
binary_info["major_version"],
|
||||
)
|
||||
|
||||
if binary_info["is_chrome_136_plus"]:
|
||||
LOG.info("(info) %s 136+ detected - security validation required", binary_info["browser_name"])
|
||||
@@ -1215,17 +1270,18 @@ class WebScrapingMixin:
|
||||
# Report remote detection results
|
||||
if diagnostic_info["remote_detection"]:
|
||||
remote_info = diagnostic_info["remote_detection"]
|
||||
LOG.info("(info) %s version from remote debugging: %s %s (major: %d)",
|
||||
remote_info["browser_name"], remote_info["browser_name"], remote_info["version_string"], remote_info["major_version"])
|
||||
LOG.info(
|
||||
"(info) %s version from remote debugging: %s (major: %d)",
|
||||
remote_info["browser_name"],
|
||||
remote_info["version_string"],
|
||||
remote_info["major_version"],
|
||||
)
|
||||
|
||||
if remote_info["is_chrome_136_plus"]:
|
||||
LOG.info("(info) Remote %s 136+ detected - validating configuration", remote_info["browser_name"])
|
||||
|
||||
# Validate configuration for Chrome/Edge 136+
|
||||
is_valid, error_message = validate_chrome_136_configuration(
|
||||
list(self.browser_config.arguments),
|
||||
self.browser_config.user_data_dir
|
||||
)
|
||||
is_valid, error_message = validate_chrome_136_configuration(list(self.browser_config.arguments), self.browser_config.user_data_dir)
|
||||
|
||||
if not is_valid:
|
||||
LOG.error("(fail) %s 136+ configuration validation failed: %s", remote_info["browser_name"], error_message)
|
||||
|
||||
@@ -20,26 +20,26 @@ import platformdirs
|
||||
|
||||
from kleinanzeigen_bot.utils import loggers
|
||||
|
||||
LOG: Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
|
||||
APP_NAME: Final[str] = "kleinanzeigen-bot"
|
||||
APP_NAME:Final[str] = "kleinanzeigen-bot"
|
||||
|
||||
InstallationMode = Literal["portable", "xdg"]
|
||||
PathCategory = Literal["config", "cache", "state"]
|
||||
|
||||
|
||||
def _normalize_mode(mode: str | InstallationMode) -> InstallationMode:
|
||||
def _normalize_mode(mode:str | InstallationMode) -> InstallationMode:
|
||||
"""Validate and normalize installation mode input."""
|
||||
if mode in {"portable", "xdg"}:
|
||||
return cast(InstallationMode, mode)
|
||||
raise ValueError(f"Unsupported installation mode: {mode}")
|
||||
|
||||
|
||||
def _ensure_directory(path: Path, description: str) -> None:
|
||||
def _ensure_directory(path:Path, description:str) -> None:
|
||||
"""Create directory and verify it exists."""
|
||||
LOG.debug("Creating directory: %s", path)
|
||||
try:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
path.mkdir(parents = True, exist_ok = True)
|
||||
except OSError as exc:
|
||||
LOG.error("Failed to create %s %s: %s", description, path, exc)
|
||||
raise
|
||||
@@ -47,7 +47,7 @@ def _ensure_directory(path: Path, description: str) -> None:
|
||||
raise NotADirectoryError(str(path))
|
||||
|
||||
|
||||
def get_xdg_base_dir(category: PathCategory) -> Path:
|
||||
def get_xdg_base_dir(category:PathCategory) -> Path:
|
||||
"""Get XDG base directory for the given category.
|
||||
|
||||
Args:
|
||||
@@ -56,7 +56,7 @@ def get_xdg_base_dir(category: PathCategory) -> Path:
|
||||
Returns:
|
||||
Path to the XDG base directory for this app
|
||||
"""
|
||||
resolved: str | None = None
|
||||
resolved:str | None = None
|
||||
match category:
|
||||
case "config":
|
||||
resolved = platformdirs.user_config_dir(APP_NAME)
|
||||
@@ -130,7 +130,7 @@ def prompt_installation_mode() -> InstallationMode:
|
||||
return "portable"
|
||||
|
||||
if choice == "1":
|
||||
mode: InstallationMode = "portable"
|
||||
mode:InstallationMode = "portable"
|
||||
LOG.info("User selected installation mode: %s", mode)
|
||||
return mode
|
||||
if choice == "2":
|
||||
@@ -140,7 +140,7 @@ def prompt_installation_mode() -> InstallationMode:
|
||||
print(_("Invalid choice. Please enter 1 or 2."))
|
||||
|
||||
|
||||
def get_config_file_path(mode: str | InstallationMode) -> Path:
|
||||
def get_config_file_path(mode:str | InstallationMode) -> Path:
|
||||
"""Get config.yaml file path for the given mode.
|
||||
|
||||
Args:
|
||||
@@ -156,7 +156,7 @@ def get_config_file_path(mode: str | InstallationMode) -> Path:
|
||||
return config_path
|
||||
|
||||
|
||||
def get_ad_files_search_dir(mode: str | InstallationMode) -> Path:
|
||||
def get_ad_files_search_dir(mode:str | InstallationMode) -> Path:
|
||||
"""Get directory to search for ad files.
|
||||
|
||||
Ad files are searched relative to the config file directory,
|
||||
@@ -175,7 +175,7 @@ def get_ad_files_search_dir(mode: str | InstallationMode) -> Path:
|
||||
return search_dir
|
||||
|
||||
|
||||
def get_downloaded_ads_path(mode: str | InstallationMode) -> Path:
|
||||
def get_downloaded_ads_path(mode:str | InstallationMode) -> Path:
|
||||
"""Get downloaded ads directory path.
|
||||
|
||||
Args:
|
||||
@@ -198,7 +198,7 @@ def get_downloaded_ads_path(mode: str | InstallationMode) -> Path:
|
||||
return ads_path
|
||||
|
||||
|
||||
def get_browser_profile_path(mode: str | InstallationMode, config_override: str | None = None) -> Path:
|
||||
def get_browser_profile_path(mode:str | InstallationMode, config_override:str | None = None) -> Path:
|
||||
"""Get browser profile directory path.
|
||||
|
||||
Args:
|
||||
@@ -213,13 +213,13 @@ def get_browser_profile_path(mode: str | InstallationMode, config_override: str
|
||||
"""
|
||||
mode = _normalize_mode(mode)
|
||||
if config_override:
|
||||
profile_path = Path(config_override)
|
||||
profile_path = Path(config_override).expanduser().resolve()
|
||||
LOG.debug("Resolving browser profile path for mode '%s' (config override): %s", mode, profile_path)
|
||||
elif mode == "portable":
|
||||
profile_path = Path.cwd() / ".temp" / "browser-profile"
|
||||
profile_path = (Path.cwd() / ".temp" / "browser-profile").resolve()
|
||||
LOG.debug("Resolving browser profile path for mode '%s': %s", mode, profile_path)
|
||||
else: # xdg
|
||||
profile_path = get_xdg_base_dir("cache") / "browser-profile"
|
||||
profile_path = (get_xdg_base_dir("cache") / "browser-profile").resolve()
|
||||
LOG.debug("Resolving browser profile path for mode '%s': %s", mode, profile_path)
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
@@ -228,7 +228,7 @@ def get_browser_profile_path(mode: str | InstallationMode, config_override: str
|
||||
return profile_path
|
||||
|
||||
|
||||
def get_log_file_path(basename: str, mode: str | InstallationMode) -> Path:
|
||||
def get_log_file_path(basename:str, mode:str | InstallationMode) -> Path:
|
||||
"""Get log file path.
|
||||
|
||||
Args:
|
||||
@@ -249,7 +249,7 @@ def get_log_file_path(basename: str, mode: str | InstallationMode) -> Path:
|
||||
return log_path
|
||||
|
||||
|
||||
def get_update_check_state_path(mode: str | InstallationMode) -> Path:
|
||||
def get_update_check_state_path(mode:str | InstallationMode) -> Path:
|
||||
"""Get update check state file path.
|
||||
|
||||
Args:
|
||||
|
||||
Reference in New Issue
Block a user