mirror of
https://github.com/Second-Hand-Friends/kleinanzeigen-bot.git
synced 2026-03-12 10:31:50 +01:00
refact: reorganize utility modules
This commit is contained in:
3
src/kleinanzeigen_bot/utils/__init__.py
Normal file
3
src/kleinanzeigen_bot/utils/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""
|
||||
This module contains generic, reusable code.
|
||||
"""
|
||||
120
src/kleinanzeigen_bot/utils/dicts.py
Normal file
120
src/kleinanzeigen_bot/utils/dicts.py
Normal file
@@ -0,0 +1,120 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import copy, json, os
|
||||
from collections.abc import Callable
|
||||
from importlib.resources import read_text as get_resource_as_string
|
||||
from gettext import gettext as _
|
||||
from types import ModuleType
|
||||
from typing import Any, Final
|
||||
|
||||
from ruamel.yaml import YAML
|
||||
from . import files, loggers # pylint: disable=cyclic-import
|
||||
|
||||
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
|
||||
|
||||
def apply_defaults(
|
||||
target:dict[Any, Any],
|
||||
defaults:dict[Any, Any],
|
||||
ignore:Callable[[Any, Any], bool] = lambda _k, _v: False,
|
||||
override:Callable[[Any, Any], bool] = lambda _k, _v: False
|
||||
) -> dict[Any, Any]:
|
||||
"""
|
||||
>>> apply_defaults({}, {"foo": "bar"})
|
||||
{'foo': 'bar'}
|
||||
>>> apply_defaults({"foo": "foo"}, {"foo": "bar"})
|
||||
{'foo': 'foo'}
|
||||
>>> apply_defaults({"foo": ""}, {"foo": "bar"})
|
||||
{'foo': ''}
|
||||
>>> apply_defaults({}, {"foo": "bar"}, ignore = lambda k, _: k == "foo")
|
||||
{}
|
||||
>>> apply_defaults({"foo": ""}, {"foo": "bar"}, override = lambda _, v: v == "")
|
||||
{'foo': 'bar'}
|
||||
>>> apply_defaults({"foo": None}, {"foo": "bar"}, override = lambda _, v: v == "")
|
||||
{'foo': None}
|
||||
"""
|
||||
for key, default_value in defaults.items():
|
||||
if key in target:
|
||||
if isinstance(target[key], dict) and isinstance(default_value, dict):
|
||||
apply_defaults(target[key], default_value, ignore = ignore)
|
||||
elif override(key, target[key]):
|
||||
target[key] = copy.deepcopy(default_value)
|
||||
elif not ignore(key, default_value):
|
||||
target[key] = copy.deepcopy(default_value)
|
||||
return target
|
||||
|
||||
|
||||
def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]:
|
||||
"""
|
||||
:raises FileNotFoundError
|
||||
"""
|
||||
data = load_dict_if_exists(filepath, content_label)
|
||||
if data is None:
|
||||
raise FileNotFoundError(filepath)
|
||||
return data
|
||||
|
||||
|
||||
def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None:
|
||||
abs_filepath = files.abspath(filepath)
|
||||
LOG.info("Loading %s[%s]...", content_label and content_label + _(" from ") or "", abs_filepath)
|
||||
|
||||
__, file_ext = os.path.splitext(filepath)
|
||||
if file_ext not in (".json", ".yaml", ".yml"):
|
||||
raise ValueError(_('Unsupported file type. The filename "%s" must end with *.json, *.yaml, or *.yml') % filepath)
|
||||
|
||||
if not os.path.exists(filepath):
|
||||
return None
|
||||
|
||||
with open(filepath, encoding = "utf-8") as file:
|
||||
return json.load(file) if filepath.endswith(".json") else YAML().load(file) # type: ignore[no-any-return] # mypy
|
||||
|
||||
|
||||
def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]:
|
||||
"""
|
||||
:raises FileNotFoundError
|
||||
"""
|
||||
LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename)
|
||||
|
||||
__, file_ext = os.path.splitext(filename)
|
||||
if file_ext not in (".json", ".yaml", ".yml"):
|
||||
raise ValueError(f'Unsupported file type. The filename "{filename}" must end with *.json, *.yaml, or *.yml')
|
||||
|
||||
content = get_resource_as_string(module, filename) # pylint: disable=deprecated-method
|
||||
return json.loads(content) if filename.endswith(".json") else YAML().load(content) # type: ignore[no-any-return] # mypy
|
||||
|
||||
|
||||
def save_dict(filepath:str, content:dict[str, Any]) -> None:
|
||||
filepath = files.abspath(filepath)
|
||||
LOG.info("Saving [%s]...", filepath)
|
||||
with open(filepath, "w", encoding = "utf-8") as file:
|
||||
if filepath.endswith(".json"):
|
||||
file.write(json.dumps(content, indent = 2, ensure_ascii = False))
|
||||
else:
|
||||
yaml = YAML()
|
||||
yaml.indent(mapping = 2, sequence = 4, offset = 2)
|
||||
yaml.representer.add_representer(str, # use YAML | block style for multi-line strings
|
||||
lambda dumper, data:
|
||||
dumper.represent_scalar('tag:yaml.org,2002:str', data, style = '|' if '\n' in data else None)
|
||||
)
|
||||
yaml.allow_duplicate_keys = False
|
||||
yaml.explicit_start = False
|
||||
yaml.dump(content, file)
|
||||
|
||||
|
||||
def safe_get(a_map:dict[Any, Any], *keys:str) -> Any:
|
||||
"""
|
||||
>>> safe_get({"foo": {}}, "foo", "bar") is None
|
||||
True
|
||||
>>> safe_get({"foo": {"bar": "some_value"}}, "foo", "bar")
|
||||
'some_value'
|
||||
"""
|
||||
if a_map:
|
||||
for key in keys:
|
||||
try:
|
||||
a_map = a_map[key]
|
||||
except (KeyError, TypeError):
|
||||
return None
|
||||
return a_map
|
||||
28
src/kleinanzeigen_bot/utils/error_handlers.py
Normal file
28
src/kleinanzeigen_bot/utils/error_handlers.py
Normal file
@@ -0,0 +1,28 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import sys, traceback
|
||||
from types import FrameType, TracebackType
|
||||
from typing import Any, Final
|
||||
|
||||
from . import loggers
|
||||
|
||||
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
|
||||
|
||||
def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None:
|
||||
if issubclass(ex_type, KeyboardInterrupt):
|
||||
sys.__excepthook__(ex_type, ex_value, ex_traceback)
|
||||
elif loggers.is_debug(LOG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)):
|
||||
LOG.error("".join(traceback.format_exception(ex_type, ex_value, ex_traceback)))
|
||||
elif isinstance(ex_value, AssertionError):
|
||||
LOG.error(ex_value)
|
||||
else:
|
||||
LOG.error("%s: %s", ex_type.__name__, ex_value)
|
||||
|
||||
|
||||
def on_sigint(_sig:int, _frame:FrameType | None) -> None:
|
||||
LOG.warning("Aborted on user request.")
|
||||
sys.exit(0)
|
||||
22
src/kleinanzeigen_bot/utils/files.py
Normal file
22
src/kleinanzeigen_bot/utils/files.py
Normal file
@@ -0,0 +1,22 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import os
|
||||
|
||||
|
||||
def abspath(relative_path:str, relative_to:str | None = None) -> str:
|
||||
"""
|
||||
Makes a given relative path absolute based on another file/folder
|
||||
"""
|
||||
if not relative_to:
|
||||
return os.path.abspath(relative_path)
|
||||
|
||||
if os.path.isabs(relative_path):
|
||||
return relative_path
|
||||
|
||||
if os.path.isfile(relative_to):
|
||||
relative_to = os.path.dirname(relative_to)
|
||||
|
||||
return os.path.normpath(os.path.join(relative_to, relative_path))
|
||||
199
src/kleinanzeigen_bot/utils/i18n.py
Normal file
199
src/kleinanzeigen_bot/utils/i18n.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import ctypes, gettext, inspect, locale, logging, os, sys
|
||||
from collections.abc import Sized
|
||||
from typing import Any, Final, NamedTuple
|
||||
|
||||
from kleinanzeigen_bot import resources
|
||||
from . import reflect
|
||||
from . import dicts
|
||||
|
||||
__all__ = [
|
||||
"Locale",
|
||||
"get_current_locale",
|
||||
"pluralize",
|
||||
"set_current_locale",
|
||||
"translate"
|
||||
]
|
||||
|
||||
LOG:Final[logging.Logger] = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Locale(NamedTuple):
|
||||
|
||||
language:str # Language code (e.g., "en", "de")
|
||||
region:str | None = None # Region code (e.g., "US", "DE")
|
||||
encoding:str = "UTF-8" # Encoding format (e.g., "UTF-8")
|
||||
|
||||
def __str__(self) -> str:
|
||||
"""
|
||||
>>> str(Locale("en", "US", "UTF-8"))
|
||||
'en_US.UTF-8'
|
||||
>>> str(Locale("en", "US"))
|
||||
'en_US.UTF-8'
|
||||
>>> str(Locale("en"))
|
||||
'en.UTF-8'
|
||||
>>> str(Locale("de", None, "UTF-8"))
|
||||
'de.UTF-8'
|
||||
"""
|
||||
region_part = f"_{self.region}" if self.region else ""
|
||||
encoding_part = f".{self.encoding}" if self.encoding else ""
|
||||
return f"{self.language}{region_part}{encoding_part}"
|
||||
|
||||
@staticmethod
|
||||
def of(locale_string: str) -> 'Locale':
|
||||
"""
|
||||
>>> Locale.of("en_US.UTF-8")
|
||||
Locale(language='en', region='US', encoding='UTF-8')
|
||||
>>> Locale.of("de.UTF-8")
|
||||
Locale(language='de', region=None, encoding='UTF-8')
|
||||
>>> Locale.of("de_DE")
|
||||
Locale(language='de', region='DE', encoding='UTF-8')
|
||||
>>> Locale.of("en")
|
||||
Locale(language='en', region=None, encoding='UTF-8')
|
||||
>>> Locale.of("en.UTF-8")
|
||||
Locale(language='en', region=None, encoding='UTF-8')
|
||||
"""
|
||||
parts = locale_string.split(".")
|
||||
language_and_region = parts[0]
|
||||
encoding = parts[1].upper() if len(parts) > 1 else "UTF-8"
|
||||
|
||||
parts = language_and_region.split("_")
|
||||
language = parts[0]
|
||||
region = parts[1].upper() if len(parts) > 1 else None
|
||||
|
||||
return Locale(language = language, region = region, encoding = encoding)
|
||||
|
||||
|
||||
def _detect_locale() -> Locale:
|
||||
"""
|
||||
Detects the system language, returning a tuple of (language, region, encoding).
|
||||
- On macOS/Linux, it uses the LANG environment variable.
|
||||
- On Windows, it uses the Windows API via ctypes to get the default UI language.
|
||||
|
||||
Returns:
|
||||
(language, region, encoding): e.g. ("en", "US", "UTF-8")
|
||||
"""
|
||||
lang = os.environ.get("LANG", None)
|
||||
|
||||
if not lang and os.name == "nt": # Windows
|
||||
try:
|
||||
lang = locale.windows_locale.get(ctypes.windll.kernel32.GetUserDefaultUILanguage(), "en_US") # type: ignore[attr-defined,unused-ignore] # mypy
|
||||
except Exception:
|
||||
LOG.warning("Error detecting language on Windows", exc_info = True)
|
||||
|
||||
return Locale.of(lang) if lang else Locale("en", "US", "UTF-8")
|
||||
|
||||
|
||||
_CURRENT_LOCALE: Locale = _detect_locale()
|
||||
_TRANSLATIONS: dict[str, Any] | None = None
|
||||
|
||||
|
||||
def translate(text:object, caller: inspect.FrameInfo | None) -> str:
|
||||
text = str(text)
|
||||
if not caller:
|
||||
return text
|
||||
|
||||
global _TRANSLATIONS
|
||||
if _TRANSLATIONS is None:
|
||||
try:
|
||||
_TRANSLATIONS = dicts.load_dict_from_module(resources, f"translations.{_CURRENT_LOCALE[0]}.yaml")
|
||||
except FileNotFoundError:
|
||||
_TRANSLATIONS = {}
|
||||
|
||||
if not _TRANSLATIONS:
|
||||
return text
|
||||
|
||||
module_name = caller.frame.f_globals.get('__name__') # pylint: disable=redefined-outer-name
|
||||
file_basename = os.path.splitext(os.path.basename(caller.filename))[0]
|
||||
if module_name and module_name.endswith(f".{file_basename}"):
|
||||
module_name = module_name[:-(len(file_basename) + 1)]
|
||||
file_key = f"{file_basename}.py" if module_name == file_basename else f"{module_name}/{file_basename}.py"
|
||||
translation = dicts.safe_get(_TRANSLATIONS,
|
||||
file_key,
|
||||
caller.function,
|
||||
text
|
||||
)
|
||||
return translation if translation else text
|
||||
|
||||
|
||||
# replace gettext.gettext with custom _translate function
|
||||
_original_gettext = gettext.gettext
|
||||
gettext.gettext = lambda message: translate(_original_gettext(message), reflect.get_caller())
|
||||
for module_name, module in sys.modules.items():
|
||||
if module is None or module_name in sys.builtin_module_names:
|
||||
continue
|
||||
if hasattr(module, '_') and getattr(module, '_') is _original_gettext:
|
||||
setattr(module, '_', gettext.gettext)
|
||||
if hasattr(module, 'gettext') and getattr(module, 'gettext') is _original_gettext:
|
||||
setattr(module, 'gettext', gettext.gettext)
|
||||
|
||||
|
||||
def get_current_locale() -> Locale:
|
||||
return _CURRENT_LOCALE
|
||||
|
||||
|
||||
def set_current_locale(new_locale:Locale) -> None:
|
||||
global _CURRENT_LOCALE, _TRANSLATIONS
|
||||
if new_locale.language != _CURRENT_LOCALE.language:
|
||||
_TRANSLATIONS = None
|
||||
_CURRENT_LOCALE = new_locale
|
||||
|
||||
|
||||
def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str:
|
||||
"""
|
||||
>>> set_current_locale(Locale("en")) # Setup for doctests
|
||||
>>> pluralize("field", 1)
|
||||
'1 field'
|
||||
>>> pluralize("field", 2)
|
||||
'2 fields'
|
||||
>>> pluralize("field", 2, prefix_with_count = False)
|
||||
'fields'
|
||||
"""
|
||||
noun = translate(noun, reflect.get_caller())
|
||||
|
||||
if isinstance(count, Sized):
|
||||
count = len(count)
|
||||
|
||||
prefix = f"{count} " if prefix_with_count else ""
|
||||
|
||||
if count == 1:
|
||||
return f"{prefix}{noun}"
|
||||
|
||||
# German
|
||||
if _CURRENT_LOCALE.language == "de":
|
||||
# Special cases
|
||||
irregular_plurals = {
|
||||
"Attribute": "Attribute",
|
||||
"Bild": "Bilder",
|
||||
"Feld": "Felder",
|
||||
}
|
||||
if noun in irregular_plurals:
|
||||
return f"{prefix}{irregular_plurals[noun]}"
|
||||
for singular_suffix, plural_suffix in irregular_plurals.items():
|
||||
if noun.lower().endswith(singular_suffix):
|
||||
pluralized = noun[:-len(singular_suffix)] + plural_suffix.lower()
|
||||
return f"{prefix}{pluralized}"
|
||||
|
||||
# Very simplified German rules
|
||||
if noun.endswith("ei"):
|
||||
return f"{prefix}{noun}en" # Datei -> Dateien
|
||||
if noun.endswith("e"):
|
||||
return f"{prefix}{noun}n" # Blume -> Blumen
|
||||
if noun.endswith(("el", "er", "en")):
|
||||
return f"{prefix}{noun}" # Keller -> Keller
|
||||
if noun[-1] in "aeiou":
|
||||
return f"{prefix}{noun}s" # Auto -> Autos
|
||||
return f"{prefix}{noun}e" # Hund -> Hunde
|
||||
|
||||
# English
|
||||
if len(noun) < 2:
|
||||
return f"{prefix}{noun}s"
|
||||
if noun.endswith(('s', 'sh', 'ch', 'x', 'z')):
|
||||
return f"{prefix}{noun}es"
|
||||
if noun.endswith('y') and noun[-2].lower() not in "aeiou":
|
||||
return f"{prefix}{noun[:-1]}ies"
|
||||
return f"{prefix}{noun}s"
|
||||
116
src/kleinanzeigen_bot/utils/loggers.py
Normal file
116
src/kleinanzeigen_bot/utils/loggers.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import copy, logging, re, sys
|
||||
from gettext import gettext as _
|
||||
from typing import Any, Final # @UnusedImport
|
||||
|
||||
import colorama
|
||||
from . import i18n, reflect
|
||||
|
||||
__all__ = [
|
||||
"Logger",
|
||||
"LOG_ROOT",
|
||||
"DEBUG",
|
||||
"INFO",
|
||||
"configure_console_logging",
|
||||
"flush_all_handlers",
|
||||
"get_logger"
|
||||
]
|
||||
|
||||
Logger = logging.Logger
|
||||
DEBUG:Final[int] = logging.DEBUG
|
||||
INFO:Final[int] = logging.INFO
|
||||
|
||||
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
|
||||
|
||||
|
||||
def configure_console_logging() -> None:
|
||||
|
||||
class CustomFormatter(logging.Formatter):
|
||||
LEVEL_COLORS = {
|
||||
logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT,
|
||||
logging.INFO: colorama.Fore.BLACK + colorama.Style.BRIGHT,
|
||||
logging.WARNING: colorama.Fore.YELLOW,
|
||||
logging.ERROR: colorama.Fore.RED,
|
||||
logging.CRITICAL: colorama.Fore.RED,
|
||||
}
|
||||
MESSAGE_COLORS = {
|
||||
logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT,
|
||||
logging.INFO: colorama.Fore.RESET,
|
||||
logging.WARNING: colorama.Fore.YELLOW,
|
||||
logging.ERROR: colorama.Fore.RED,
|
||||
logging.CRITICAL: colorama.Fore.RED + colorama.Style.BRIGHT,
|
||||
}
|
||||
VALUE_COLORS = {
|
||||
logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT,
|
||||
logging.INFO: colorama.Fore.MAGENTA,
|
||||
logging.WARNING: colorama.Fore.MAGENTA,
|
||||
logging.ERROR: colorama.Fore.MAGENTA,
|
||||
logging.CRITICAL: colorama.Fore.MAGENTA,
|
||||
}
|
||||
|
||||
def format(self, record:logging.LogRecord) -> str:
|
||||
record = copy.deepcopy(record)
|
||||
|
||||
level_color = self.LEVEL_COLORS.get(record.levelno, "")
|
||||
msg_color = self.MESSAGE_COLORS.get(record.levelno, "")
|
||||
value_color = self.VALUE_COLORS.get(record.levelno, "")
|
||||
|
||||
# translate and colorize log level name
|
||||
levelname = _(record.levelname) if record.levelno > logging.DEBUG else record.levelname
|
||||
record.levelname = f"{level_color}[{levelname}]{colorama.Style.RESET_ALL}"
|
||||
|
||||
# highlight message values enclosed by [...], "...", and '...'
|
||||
record.msg = re.sub(
|
||||
r"\[([^\]]+)\]|\"([^\"]+)\"|\'([^\']+)\'",
|
||||
lambda match: f"[{value_color}{match.group(1) or match.group(2) or match.group(3)}{colorama.Fore.RESET}{msg_color}]",
|
||||
str(record.msg),
|
||||
)
|
||||
|
||||
# colorize message
|
||||
record.msg = f"{msg_color}{record.msg}{colorama.Style.RESET_ALL}"
|
||||
|
||||
return super().format(record)
|
||||
|
||||
formatter = CustomFormatter("%(levelname)s %(message)s")
|
||||
|
||||
stdout_log = logging.StreamHandler(sys.stderr)
|
||||
stdout_log.setLevel(logging.DEBUG)
|
||||
stdout_log.addFilter(type("", (logging.Filter,), {
|
||||
"filter": lambda rec: rec.levelno <= logging.INFO
|
||||
}))
|
||||
stdout_log.setFormatter(formatter)
|
||||
LOG_ROOT.addHandler(stdout_log)
|
||||
|
||||
stderr_log = logging.StreamHandler(sys.stderr)
|
||||
stderr_log.setLevel(logging.WARNING)
|
||||
stderr_log.setFormatter(formatter)
|
||||
LOG_ROOT.addHandler(stderr_log)
|
||||
|
||||
|
||||
def flush_all_handlers() -> None:
|
||||
for handler in LOG_ROOT.handlers:
|
||||
handler.flush()
|
||||
|
||||
|
||||
def get_logger(name: str | None = None) -> logging.Logger:
|
||||
"""
|
||||
Returns a localized logger
|
||||
"""
|
||||
|
||||
class TranslatingLogger(logging.Logger):
|
||||
|
||||
def _log(self, level: int, msg: object, *args: Any, **kwargs: Any) -> None:
|
||||
if level != logging.DEBUG: # debug messages should not be translated
|
||||
msg = i18n.translate(msg, reflect.get_caller(2))
|
||||
super()._log(level, msg, *args, **kwargs)
|
||||
|
||||
logging.setLoggerClass(TranslatingLogger)
|
||||
return logging.getLogger(name)
|
||||
|
||||
|
||||
def is_debug(logger:Logger) -> bool:
|
||||
return logger.isEnabledFor(logging.DEBUG)
|
||||
90
src/kleinanzeigen_bot/utils/misc.py
Normal file
90
src/kleinanzeigen_bot/utils/misc.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import asyncio, decimal, re, sys, time
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from gettext import gettext as _
|
||||
from typing import Any, TypeVar
|
||||
|
||||
# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None:
|
||||
"""
|
||||
:param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds
|
||||
:param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds
|
||||
:raises AssertionError: if condition did not come `True` within given timespan
|
||||
"""
|
||||
if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864
|
||||
if condition:
|
||||
return
|
||||
raise AssertionError(_(error_message))
|
||||
|
||||
if timeout < 0:
|
||||
raise AssertionError("[timeout] must be >= 0")
|
||||
if poll_requency < 0:
|
||||
raise AssertionError("[poll_requency] must be >= 0")
|
||||
|
||||
start_at = time.time()
|
||||
while not condition(): # type: ignore[operator]
|
||||
elapsed = time.time() - start_at
|
||||
if elapsed >= timeout:
|
||||
raise AssertionError(_(error_message))
|
||||
time.sleep(poll_requency)
|
||||
|
||||
|
||||
def is_frozen() -> bool:
|
||||
"""
|
||||
>>> is_frozen()
|
||||
False
|
||||
"""
|
||||
return getattr(sys, "frozen", False)
|
||||
|
||||
|
||||
async def ainput(prompt: str) -> str:
|
||||
return await asyncio.to_thread(input, f'{prompt} ')
|
||||
|
||||
|
||||
def parse_decimal(number:float | int | str) -> decimal.Decimal:
|
||||
"""
|
||||
>>> parse_decimal(5)
|
||||
Decimal('5')
|
||||
>>> parse_decimal(5.5)
|
||||
Decimal('5.5')
|
||||
>>> parse_decimal("5.5")
|
||||
Decimal('5.5')
|
||||
>>> parse_decimal("5,5")
|
||||
Decimal('5.5')
|
||||
>>> parse_decimal("1.005,5")
|
||||
Decimal('1005.5')
|
||||
>>> parse_decimal("1,005.5")
|
||||
Decimal('1005.5')
|
||||
"""
|
||||
try:
|
||||
return decimal.Decimal(number)
|
||||
except decimal.InvalidOperation as ex:
|
||||
parts = re.split("[.,]", str(number))
|
||||
try:
|
||||
return decimal.Decimal("".join(parts[:-1]) + "." + parts[-1])
|
||||
except decimal.InvalidOperation:
|
||||
raise decimal.DecimalException(f"Invalid number format: {number}") from ex
|
||||
|
||||
|
||||
def parse_datetime(date:datetime | str | None) -> datetime | None:
|
||||
"""
|
||||
>>> parse_datetime(datetime(2020, 1, 1, 0, 0))
|
||||
datetime.datetime(2020, 1, 1, 0, 0)
|
||||
>>> parse_datetime("2020-01-01T00:00:00")
|
||||
datetime.datetime(2020, 1, 1, 0, 0)
|
||||
>>> parse_datetime(None)
|
||||
|
||||
"""
|
||||
if date is None:
|
||||
return None
|
||||
if isinstance(date, datetime):
|
||||
return date
|
||||
return datetime.fromisoformat(date)
|
||||
20
src/kleinanzeigen_bot/utils/net.py
Normal file
20
src/kleinanzeigen_bot/utils/net.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import socket
|
||||
|
||||
|
||||
def is_port_open(host:str, port:int) -> bool:
|
||||
s:socket.socket | None = None
|
||||
try:
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.settimeout(1)
|
||||
s.connect((host, port))
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
finally:
|
||||
if s:
|
||||
s.close()
|
||||
26
src/kleinanzeigen_bot/utils/reflect.py
Normal file
26
src/kleinanzeigen_bot/utils/reflect.py
Normal file
@@ -0,0 +1,26 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import inspect
|
||||
from typing import Any
|
||||
|
||||
|
||||
def get_caller(depth: int = 1) -> inspect.FrameInfo | None:
|
||||
stack = inspect.stack()
|
||||
try:
|
||||
for frame in stack[depth + 1:]:
|
||||
if frame.function and frame.function != "<lambda>":
|
||||
return frame
|
||||
return None
|
||||
finally:
|
||||
del stack # Clean up the stack to avoid reference cycles
|
||||
|
||||
|
||||
def is_integer(obj:Any) -> bool:
|
||||
try:
|
||||
int(obj)
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
567
src/kleinanzeigen_bot/utils/web_scraping_mixin.py
Normal file
567
src/kleinanzeigen_bot/utils/web_scraping_mixin.py
Normal file
@@ -0,0 +1,567 @@
|
||||
"""
|
||||
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
|
||||
SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
|
||||
"""
|
||||
import asyncio, enum, inspect, json, os, platform, secrets, shutil, time
|
||||
from collections.abc import Callable, Coroutine, Iterable
|
||||
from gettext import gettext as _
|
||||
from typing import cast, Any, Final
|
||||
|
||||
try:
|
||||
from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy
|
||||
except ImportError:
|
||||
from typing import NoReturn as Never # Python <3.11
|
||||
|
||||
import nodriver, psutil
|
||||
from nodriver.core.browser import Browser
|
||||
from nodriver.core.config import Config
|
||||
from nodriver.core.element import Element
|
||||
from nodriver.core.tab import Tab as Page
|
||||
|
||||
from . import loggers, net
|
||||
from .misc import ensure, T
|
||||
|
||||
__all__ = [
|
||||
"Browser",
|
||||
"BrowserConfig",
|
||||
"By",
|
||||
"Element",
|
||||
"Page",
|
||||
"Is",
|
||||
"WebScrapingMixin",
|
||||
]
|
||||
|
||||
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
|
||||
|
||||
# see https://api.jquery.com/category/selectors/
|
||||
METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f'\\{ch}' for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'})
|
||||
|
||||
|
||||
class By(enum.Enum):
|
||||
ID = enum.auto()
|
||||
CLASS_NAME = enum.auto()
|
||||
CSS_SELECTOR = enum.auto()
|
||||
TAG_NAME = enum.auto()
|
||||
TEXT = enum.auto()
|
||||
XPATH = enum.auto()
|
||||
|
||||
|
||||
class Is(enum.Enum):
|
||||
CLICKABLE = enum.auto()
|
||||
DISPLAYED = enum.auto()
|
||||
DISABLED = enum.auto()
|
||||
READONLY = enum.auto()
|
||||
SELECTED = enum.auto()
|
||||
|
||||
|
||||
class BrowserConfig:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.arguments:Iterable[str] = []
|
||||
self.binary_location:str | None = None
|
||||
self.extensions:Iterable[str] = []
|
||||
self.use_private_window:bool = True
|
||||
self.user_data_dir:str = ""
|
||||
self.profile_name:str = ""
|
||||
|
||||
|
||||
class WebScrapingMixin:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.browser_config:Final[BrowserConfig] = BrowserConfig()
|
||||
self.browser:Browser = None # pyright: ignore
|
||||
self.page:Page = None # pyright: ignore
|
||||
|
||||
async def create_browser_session(self) -> None:
|
||||
LOG.info("Creating Browser session...")
|
||||
|
||||
if self.browser_config.binary_location:
|
||||
ensure(os.path.exists(self.browser_config.binary_location), f"Specified browser binary [{self.browser_config.binary_location}] does not exist.")
|
||||
else:
|
||||
self.browser_config.binary_location = self.get_compatible_browser()
|
||||
LOG.info(" -> Browser binary location: %s", self.browser_config.binary_location)
|
||||
|
||||
########################################################
|
||||
# check if an existing browser instance shall be used...
|
||||
########################################################
|
||||
remote_host = "127.0.0.1"
|
||||
remote_port = 0
|
||||
for arg in self.browser_config.arguments:
|
||||
if arg.startswith("--remote-debugging-host="):
|
||||
remote_host = arg.split("=", 2)[1]
|
||||
if arg.startswith("--remote-debugging-port="):
|
||||
remote_port = int(arg.split("=", 2)[1])
|
||||
|
||||
if remote_port > 0:
|
||||
LOG.info("Using existing browser process at %s:%s", remote_host, remote_port)
|
||||
ensure(net.is_port_open(remote_host, remote_port),
|
||||
f"Browser process not reachable at {remote_host}:{remote_port}. " +
|
||||
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml")
|
||||
cfg = Config(
|
||||
browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without
|
||||
)
|
||||
cfg.host = remote_host
|
||||
cfg.port = remote_port
|
||||
self.browser = await nodriver.start(cfg)
|
||||
LOG.info("New Browser session is %s", self.browser.websocket_url)
|
||||
return
|
||||
|
||||
########################################################
|
||||
# configure and initialize new browser instance...
|
||||
########################################################
|
||||
|
||||
# default_browser_args: @ https://github.com/ultrafunkamsterdam/nodriver/blob/main/nodriver/core/config.py
|
||||
# https://peter.sh/experiments/chromium-command-line-switches/
|
||||
# https://github.com/GoogleChrome/chrome-launcher/blob/main/docs/chrome-flags-for-tools.md
|
||||
browser_args = [
|
||||
# "--disable-dev-shm-usage", # https://stackoverflow.com/a/50725918/5116073
|
||||
"--disable-crash-reporter",
|
||||
"--disable-domain-reliability",
|
||||
"--disable-sync",
|
||||
"--no-experiments",
|
||||
"--disable-search-engine-choice-screen",
|
||||
|
||||
"--disable-features=MediaRouter",
|
||||
"--use-mock-keychain",
|
||||
|
||||
"--test-type", # https://stackoverflow.com/a/36746675/5116073
|
||||
# https://chromium.googlesource.com/chromium/src/+/master/net/dns/README.md#request-remapping
|
||||
'--host-resolver-rules="MAP connect.facebook.net 127.0.0.1, MAP securepubads.g.doubleclick.net 127.0.0.1, MAP www.googletagmanager.com 127.0.0.1"'
|
||||
]
|
||||
|
||||
is_edge = "edge" in self.browser_config.binary_location.lower()
|
||||
|
||||
if is_edge:
|
||||
os.environ["MSEDGEDRIVER_TELEMETRY_OPTOUT"] = "1" # https://docs.microsoft.com/en-us/microsoft-edge/privacy-whitepaper/#microsoft-edge-driver
|
||||
|
||||
if self.browser_config.use_private_window:
|
||||
browser_args.append("-inprivate" if is_edge else "--incognito")
|
||||
|
||||
if self.browser_config.profile_name:
|
||||
LOG.info(" -> Browser profile name: %s", self.browser_config.profile_name)
|
||||
browser_args.append(f"--profile-directory={self.browser_config.profile_name}")
|
||||
|
||||
for browser_arg in self.browser_config.arguments:
|
||||
LOG.info(" -> Custom Browser argument: %s", browser_arg)
|
||||
browser_args.append(browser_arg)
|
||||
|
||||
if not loggers.is_debug(LOG):
|
||||
browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3
|
||||
|
||||
if self.browser_config.user_data_dir:
|
||||
LOG.info(" -> Browser user data dir: %s", self.browser_config.user_data_dir)
|
||||
|
||||
cfg = Config(
|
||||
headless = False,
|
||||
browser_executable_path = self.browser_config.binary_location,
|
||||
browser_args = browser_args,
|
||||
user_data_dir = self.browser_config.user_data_dir
|
||||
)
|
||||
|
||||
# already logged by nodriver:
|
||||
# LOG.debug("-> Effective browser arguments: \n\t\t%s", "\n\t\t".join(cfg.browser_args))
|
||||
|
||||
profile_dir = os.path.join(cfg.user_data_dir, self.browser_config.profile_name or "Default")
|
||||
os.makedirs(profile_dir, exist_ok = True)
|
||||
prefs_file = os.path.join(profile_dir, "Preferences")
|
||||
if not os.path.exists(prefs_file):
|
||||
LOG.info(" -> Setting chrome prefs [%s]...", prefs_file)
|
||||
with open(prefs_file, "w", encoding = 'UTF-8') as fd:
|
||||
json.dump({
|
||||
"credentials_enable_service": False,
|
||||
"enable_do_not_track": True,
|
||||
"google": {
|
||||
"services": {
|
||||
"consented_to_sync": False
|
||||
}
|
||||
},
|
||||
"profile": {
|
||||
"default_content_setting_values": {
|
||||
"popups": 0,
|
||||
"notifications": 2 # 1 = allow, 2 = block browser notifications
|
||||
},
|
||||
"password_manager_enabled": False
|
||||
},
|
||||
"signin": {
|
||||
"allowed": False
|
||||
},
|
||||
"translate_site_blacklist": [
|
||||
"www.kleinanzeigen.de"
|
||||
],
|
||||
"devtools": {
|
||||
"preferences": {
|
||||
"currentDockState": '"bottom"'
|
||||
}
|
||||
}
|
||||
}, fd)
|
||||
|
||||
# load extensions
|
||||
for crx_extension in self.browser_config.extensions:
|
||||
LOG.info(" -> Adding Browser extension: [%s]", crx_extension)
|
||||
ensure(os.path.exists(crx_extension), f"Configured extension-file [{crx_extension}] does not exist.")
|
||||
cfg.add_extension(crx_extension)
|
||||
|
||||
self.browser = await nodriver.start(cfg)
|
||||
LOG.info("New Browser session is %s", self.browser.websocket_url)
|
||||
|
||||
def close_browser_session(self) -> None:
|
||||
if self.browser:
|
||||
LOG.debug("Closing Browser session...")
|
||||
self.page = None # pyright: ignore
|
||||
browser_process = psutil.Process(self.browser._process_pid) # pylint: disable=protected-access
|
||||
browser_children:list[psutil.Process] = browser_process.children()
|
||||
self.browser.stop()
|
||||
for p in browser_children:
|
||||
if p.is_running():
|
||||
p.kill() # terminate orphaned browser processes
|
||||
self.browser = None # pyright: ignore
|
||||
|
||||
def get_compatible_browser(self) -> str:
|
||||
match platform.system():
|
||||
case "Linux":
|
||||
browser_paths = [
|
||||
shutil.which("chromium"),
|
||||
shutil.which("chromium-browser"),
|
||||
shutil.which("google-chrome"),
|
||||
shutil.which("microsoft-edge")
|
||||
]
|
||||
|
||||
case "Darwin":
|
||||
browser_paths = [
|
||||
"/Applications/Chromium.app/Contents/MacOS/Chromium",
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
|
||||
]
|
||||
|
||||
case "Windows":
|
||||
browser_paths = [
|
||||
os.environ.get("ProgramFiles", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chromium\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
|
||||
|
||||
os.environ["ProgramFiles"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["ProgramFiles(x86)"] + r'\Chrome\Application\chrome.exe',
|
||||
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
|
||||
|
||||
shutil.which("msedge.exe"),
|
||||
shutil.which("chromium.exe"),
|
||||
shutil.which("chrome.exe")
|
||||
]
|
||||
|
||||
case _ as os_name:
|
||||
raise AssertionError(_("Installed browser for OS %s could not be detected") % os_name)
|
||||
|
||||
for browser_path in browser_paths:
|
||||
if browser_path and os.path.isfile(browser_path):
|
||||
return browser_path
|
||||
|
||||
raise AssertionError(_("Installed browser could not be detected"))
|
||||
|
||||
async def web_await(self, condition: Callable[[], T | Never | Coroutine[Any, Any, T | Never]], *,
|
||||
timeout:int | float = 5, timeout_error_message: str = "") -> T:
|
||||
"""
|
||||
Blocks/waits until the given condition is met.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
start_at = loop.time()
|
||||
|
||||
while True:
|
||||
await self.page
|
||||
ex:Exception | None = None
|
||||
try:
|
||||
result_raw = condition()
|
||||
result:T = (await result_raw) if inspect.isawaitable(result_raw) else result_raw
|
||||
if result:
|
||||
return result
|
||||
except Exception as ex1:
|
||||
ex = ex1
|
||||
if loop.time() - start_at > timeout:
|
||||
if ex:
|
||||
raise ex
|
||||
raise TimeoutError(timeout_error_message or f"Condition not met within {timeout} seconds")
|
||||
await self.page.sleep(0.5)
|
||||
|
||||
async def web_check(self, selector_type:By, selector_value:str, attr:Is, *, timeout:int | float = 5) -> bool:
|
||||
"""
|
||||
Locates an HTML element and returns a state.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
|
||||
def is_disabled(elem:Element) -> bool:
|
||||
return elem.attrs.get("disabled") is not None
|
||||
|
||||
async def is_displayed(elem:Element) -> bool:
|
||||
return cast(bool, await elem.apply("""
|
||||
function (element) {
|
||||
var style = window.getComputedStyle(element);
|
||||
return style.display !== 'none'
|
||||
&& style.visibility !== 'hidden'
|
||||
&& style.opacity !== '0'
|
||||
&& element.offsetWidth > 0
|
||||
&& element.offsetHeight > 0
|
||||
}
|
||||
"""))
|
||||
|
||||
elem:Element = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
|
||||
match attr:
|
||||
case Is.CLICKABLE:
|
||||
return not is_disabled(elem) or await is_displayed(elem)
|
||||
case Is.DISPLAYED:
|
||||
return await is_displayed(elem)
|
||||
case Is.DISABLED:
|
||||
return is_disabled(elem)
|
||||
case Is.READONLY:
|
||||
return elem.attrs.get("readonly") is not None
|
||||
case Is.SELECTED:
|
||||
return cast(bool, await elem.apply("""
|
||||
function (element) {
|
||||
if (element.tagName.toLowerCase() === 'input') {
|
||||
if (element.type === 'checkbox' || element.type === 'radio') {
|
||||
return element.checked
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
"""))
|
||||
raise AssertionError(_("Unsupported attribute: %s") % attr)
|
||||
|
||||
async def web_click(self, selector_type:By, selector_value:str, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Locates an HTML element by ID.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
elem = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
await elem.click()
|
||||
await self.web_sleep()
|
||||
return elem
|
||||
|
||||
async def web_execute(self, javascript:str) -> Any:
|
||||
"""
|
||||
Executes the given JavaScript code in the context of the current page.
|
||||
|
||||
:return: The javascript's return value
|
||||
"""
|
||||
return await self.page.evaluate(javascript, True)
|
||||
|
||||
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Locates an HTML element by the given selector type and value.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
match selector_type:
|
||||
case By.ID:
|
||||
escaped_id = selector_value.translate(METACHAR_ESCAPER)
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(f"#{escaped_id}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with ID '{selector_value}' within {timeout} seconds.")
|
||||
case By.CLASS_NAME:
|
||||
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(f".{escaped_classname}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found with CSS class '{selector_value}' within {timeout} seconds.")
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found of tag <{selector_value}> within {timeout} seconds.")
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using CSS selector '{selector_value}' within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_element_by_text(selector_value, True),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML element found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> list[Element]:
|
||||
"""
|
||||
Locates an HTML element by ID.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
match selector_type:
|
||||
case By.CLASS_NAME:
|
||||
escaped_classname = selector_value.translate(METACHAR_ESCAPER)
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(f".{escaped_classname}", parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found with CSS class '{selector_value}' within {timeout} seconds.")
|
||||
case By.CSS_SELECTOR:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using CSS selector '{selector_value}' within {timeout} seconds.")
|
||||
case By.TAG_NAME:
|
||||
return await self.web_await(
|
||||
lambda: self.page.query_selector_all(selector_value, parent),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found of tag <{selector_value}> within {timeout} seconds.")
|
||||
case By.TEXT:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found containing text '{selector_value}' within {timeout} seconds.")
|
||||
case By.XPATH:
|
||||
ensure(not parent, f"Specifying a parent element currently not supported with selector type: {selector_type}")
|
||||
return await self.web_await(
|
||||
lambda: self.page.find_elements_by_text(selector_value),
|
||||
timeout = timeout,
|
||||
timeout_error_message = f"No HTML elements found using XPath '{selector_value}' within {timeout} seconds.")
|
||||
|
||||
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
|
||||
|
||||
async def web_input(self, selector_type:By, selector_value:str, text:str | int, *, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Enters text into an HTML input field.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
"""
|
||||
input_field = await self.web_find(selector_type, selector_value, timeout = timeout)
|
||||
await input_field.clear_input()
|
||||
await input_field.send_keys(str(text))
|
||||
await self.web_sleep()
|
||||
return input_field
|
||||
|
||||
async def web_open(self, url:str, *, timeout:int | float = 15000, reload_if_already_open:bool = False) -> None:
|
||||
"""
|
||||
:param url: url to open in browser
|
||||
:param timeout: timespan in seconds within the page needs to be loaded
|
||||
:param reload_if_already_open: if False does nothing if the URL is already open in the browser
|
||||
:raises TimeoutException: if page did not open within given timespan
|
||||
"""
|
||||
LOG.debug(" -> Opening [%s]...", url)
|
||||
if not reload_if_already_open and self.page and url == self.page.url:
|
||||
LOG.debug(" => skipping, [%s] is already open", url)
|
||||
return
|
||||
self.page = await self.browser.get(url, False, False)
|
||||
await self.web_await(lambda: self.web_execute("document.readyState == 'complete'"), timeout = timeout,
|
||||
timeout_error_message = f"Page did not finish loading within {timeout} seconds.")
|
||||
|
||||
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> str:
|
||||
return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
|
||||
function (elem) {
|
||||
let sel = window.getSelection()
|
||||
sel.removeAllRanges()
|
||||
let range = document.createRange()
|
||||
range.selectNode(elem)
|
||||
sel.addRange(range)
|
||||
let visibleText = sel.toString().trim()
|
||||
sel.removeAllRanges()
|
||||
return visibleText
|
||||
}
|
||||
"""))
|
||||
|
||||
async def web_sleep(self, min_ms:int = 1000, max_ms:int = 2500) -> None:
|
||||
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
|
||||
LOG.log(loggers.INFO if duration > 1500 else loggers.DEBUG, " ... pausing for %d ms ...", duration)
|
||||
await self.page.sleep(duration / 1000)
|
||||
|
||||
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
|
||||
headers:dict[str, str] | None = None) -> dict[str, Any]:
|
||||
method = method.upper()
|
||||
LOG.debug(" -> HTTP %s [%s]...", method, url)
|
||||
response = cast(dict[str, Any], await self.page.evaluate(f"""
|
||||
fetch("{url}", {{
|
||||
method: "{method}",
|
||||
redirect: "follow",
|
||||
headers: {headers or {}}
|
||||
}})
|
||||
.then(response => response.text().then(responseText => {{
|
||||
headers = {{}};
|
||||
response.headers.forEach((v, k) => headers[k] = v);
|
||||
return {{
|
||||
statusCode: response.status,
|
||||
statusMessage: response.statusText,
|
||||
headers: headers,
|
||||
content: responseText
|
||||
}}
|
||||
}}))
|
||||
""", await_promise = True))
|
||||
if isinstance(valid_response_codes, int):
|
||||
valid_response_codes = [valid_response_codes]
|
||||
ensure(
|
||||
response["statusCode"] in valid_response_codes,
|
||||
f'Invalid response "{response["statusCode"]} response["statusMessage"]" received for HTTP {method} to {url}'
|
||||
)
|
||||
return response
|
||||
# pylint: enable=dangerous-default-value
|
||||
|
||||
async def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
|
||||
"""
|
||||
Smoothly scrolls the current web page down.
|
||||
|
||||
:param scroll_length: the length of a single scroll iteration, determines smoothness of scrolling, lower is smoother
|
||||
:param scroll_speed: the speed of scrolling, higher is faster
|
||||
:param scroll_back_top: whether to scroll the page back to the top after scrolling to the bottom
|
||||
"""
|
||||
current_y_pos = 0
|
||||
bottom_y_pos: int = await self.web_execute('document.body.scrollHeight') # get bottom position
|
||||
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
|
||||
current_y_pos += scroll_length
|
||||
await self.web_execute(f'window.scrollTo(0, {current_y_pos})') # scroll one step
|
||||
time.sleep(scroll_length / scroll_speed)
|
||||
|
||||
if scroll_back_top: # scroll back to top in same style
|
||||
while current_y_pos > 0:
|
||||
current_y_pos -= scroll_length
|
||||
await self.web_execute(f'window.scrollTo(0, {current_y_pos})')
|
||||
time.sleep(scroll_length / scroll_speed / 2) # double speed
|
||||
|
||||
async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float = 5) -> Element:
|
||||
"""
|
||||
Selects an <option/> of a <select/> HTML element.
|
||||
|
||||
:param timeout: timeout in seconds
|
||||
:raises TimeoutError: if element could not be found within time
|
||||
:raises UnexpectedTagNameException: if element is not a <select> element
|
||||
"""
|
||||
await self.web_await(
|
||||
lambda: self.web_check(selector_type, selector_value, Is.CLICKABLE), timeout = timeout,
|
||||
timeout_error_message = f"No clickable HTML element with selector: {selector_type}='{selector_value}' found"
|
||||
)
|
||||
elem = await self.web_find(selector_type, selector_value)
|
||||
await elem.apply(f"""
|
||||
function (element) {{
|
||||
for(let i=0; i < element.options.length; i++)
|
||||
{{
|
||||
if(element.options[i].value == "{selected_value}") {{
|
||||
element.selectedIndex = i;
|
||||
element.dispatchEvent(new Event('change', {{ bubbles: true }}));
|
||||
break;
|
||||
}}
|
||||
}}
|
||||
throw new Error("Option with value {selected_value} not found.");
|
||||
}}
|
||||
""")
|
||||
await self.web_sleep()
|
||||
return elem
|
||||
Reference in New Issue
Block a user