mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 18:41:50 +01:00
feat: add multi-language support
This commit is contained in:
@@ -5,6 +5,7 @@ SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanze
|
||||
"""
|
||||
import asyncio, enum, inspect, json, logging, os, platform, secrets, shutil, time
|
||||
from collections.abc import Callable, Coroutine, Iterable
|
||||
from gettext import gettext as _
|
||||
from typing import cast, Any, Final
|
||||
|
||||
try:
|
||||
@@ -18,12 +19,9 @@ from nodriver.core.config import Config
|
||||
from nodriver.core.element import Element
|
||||
from nodriver.core.tab import Tab as Page
|
||||
|
||||
from .i18n import get_translating_logger
|
||||
from .utils import ensure, is_port_open, T
|
||||
|
||||
|
||||
LOG:Final[logging.Logger] = logging.getLogger("kleinanzeigen_bot.selenium_mixin")
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Browser",
|
||||
"BrowserConfig",
|
||||
@@ -31,9 +29,11 @@ __all__ = [
|
||||
"Element",
|
||||
"Page",
|
||||
"Is",
|
||||
"WebScrapingMixin"
|
||||
"WebScrapingMixin",
|
||||
]
|
||||
|
||||
LOG:Final[logging.Logger] = get_translating_logger(__name__)
|
||||
|
||||
|
||||
class By(enum.Enum):
|
||||
ID = enum.auto()
|
||||
@@ -77,7 +77,7 @@ class WebScrapingMixin:
|
||||
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
|
||||
else:
|
||||
self.browser_config.binary_location = self.get_compatible_browser()
|
||||
LOG.info(" -> Chrome binary location: %s", self.browser_config.binary_location)
|
||||
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
|
||||
|
||||
########################################################
|
||||
# check if an existing browser instance shall be used...
|
||||
@@ -92,9 +92,9 @@ class WebScrapingMixin:
|
||||
|
||||
if remote_port > 0:
|
||||
LOG.info("Using existing browser process at %s:%s", remote_host, remote_port)
|
||||
if not is_port_open(remote_host, remote_port):
|
||||
raise AssertionError(f"Browser process not reachable at {remote_host}:{remote_port}. "
|
||||
+ f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml")
|
||||
ensure(is_port_open(remote_host, remote_port),
|
||||
f"Browser process not reachable at {remote_host}:{remote_port}. " +
|
||||
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml")
|
||||
cfg = Config(
|
||||
browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without
|
||||
)
|
||||
@@ -140,7 +140,7 @@ class WebScrapingMixin:
|
||||
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
|
||||
|
||||
for browser_arg in self.browser_config.arguments:
|
||||
LOG.info(" -> Custom Chrome argument: %s", browser_arg)
|
||||
LOG.info(" -> Custom Browser argument: %s", browser_arg)
|
||||
browser_args.append(browser_arg)
|
||||
|
||||
if not LOG.isEnabledFor(logging.DEBUG):
|
||||
@@ -163,8 +163,8 @@ class WebScrapingMixin:
|
||||
os.makedirs(profile_dir, exist_ok = True)
|
||||
prefs_file = os.path.join(profile_dir, "Preferences")
|
||||
if not os.path.exists(prefs_file):
|
||||
LOG.info("-> Setting chrome prefs [%s]...", prefs_file)
|
||||
with open(prefs_file, "w", encoding='UTF-8') as fd:
|
||||
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
|
||||
with open(prefs_file, "w", encoding = 'UTF-8') as fd:
|
||||
json.dump({
|
||||
"credentials_enable_service": False,
|
||||
"enable_do_not_track": True,
|
||||
@@ -195,7 +195,7 @@ class WebScrapingMixin:
|
||||
|
||||
# load extensions
|
||||
for crx_extension in self.browser_config.extensions:
|
||||
LOG.info(" -> Adding extension: [%s]", crx_extension)
|
||||
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
|
||||
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
|
||||
cfg.add_extension(crx_extension)
|
||||
|
||||
@@ -250,15 +250,15 @@ class WebScrapingMixin:
|
||||
]
|
||||
|
||||
case _ as os_name:
|
||||
raise AssertionError(f"Installed browser for OS [{os_name}] could not be detected")
|
||||
raise AssertionError(_("Installed browser for OS %s could not be detected") % os_name)
|
||||
|
||||
for browser_path in browser_paths:
|
||||
if browser_path and os.path.isfile(browser_path):
|
||||
return browser_path
|
||||
|
||||
raise AssertionError("Installed browser could not be detected")
|
||||
raise AssertionError(_("Installed browser could not be detected"))
|
||||
|
||||
async def web_await(self, condition: Callable[[], T | Never | Coroutine[Any,Any,T | Never]], *,
|
||||
async def web_await(self, condition: Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
|
||||
timeout:int | float = 5, timeout_error_message: str = "") -> T:
|
||||
"""
|
||||
Blocks/waits until the given condition is met.
|
||||
@@ -307,6 +307,7 @@ class WebScrapingMixin:
|
||||
&& element.offsetHeight > 0
|
||||
}
|
||||
"""))
|
||||
|
||||
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
|
||||
match attr:
|
||||
@@ -329,7 +330,7 @@ class WebScrapingMixin:
|
||||
return false
|
||||
}
|
||||
"""))
|
||||
raise AssertionError(f"Unsupported attribute: {attr}")
|
||||
raise AssertionError(_("Unsupported attribute: %s") % attr)
|
||||
|
||||
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
@@ -380,21 +381,19 @@ class WebScrapingMixin:
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}' within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(f"Unsupported selector type: {selector_type}")
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> list[Element]:
|
||||
"""
|
||||
@@ -420,21 +419,19 @@ class WebScrapingMixin:
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found of tag <{selector_value}> within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
if parent:
|
||||
raise AssertionError(f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(f"Unsupported selector type: {selector_type}")
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
@@ -503,7 +500,7 @@ class WebScrapingMixin:
|
||||
content: responseText
|
||||
}}
|
||||
}}))
|
||||
""", await_promise=True))
|
||||
""", await_promise = True))
|
||||
if isinstance(valid_response_codes, int):
|
||||
valid_response_codes = [valid_response_codes]
|
||||
ensure(
|
||||
|
||||
Reference in New Issue
Block a user