diff --git a/src/kleinanzeigen_bot/__init__.py b/src/kleinanzeigen_bot/__init__.py index ecc9c69..6ae7511 100644 --- a/src/kleinanzeigen_bot/__init__.py +++ b/src/kleinanzeigen_bot/__init__.py @@ -17,16 +17,18 @@ import certifi, colorama, nodriver from ruamel.yaml import YAML from wcmatch import glob -from . import utils, resources, extract -from .i18n import Locale, get_current_locale, set_current_locale, get_translating_logger, pluralize -from .utils import abspath, ainput, apply_defaults, ensure, is_frozen, safe_get, parse_datetime, calculate_content_hash -from .web_scraping_mixin import By, Element, Page, Is, WebScrapingMixin +from . import extract, resources +from .ads import calculate_content_hash +from .utils import dicts, error_handlers, loggers, misc +from .utils.files import abspath +from .utils.i18n import Locale, get_current_locale, set_current_locale, pluralize +from .utils.misc import ainput, ensure, is_frozen, parse_datetime, parse_decimal +from .utils.web_scraping_mixin import By, Element, Page, Is, WebScrapingMixin from ._version import __version__ # W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933 -LOG_ROOT:Final[logging.Logger] = logging.getLogger() -LOG:Final[logging.Logger] = get_translating_logger(__name__) +LOG:Final[logging.Logger] = loggers.get_logger(__name__) LOG.setLevel(logging.INFO) colorama.just_fix_windows_console() @@ -59,7 +61,8 @@ class KleinanzeigenBot(WebScrapingMixin): def __del__(self) -> None: if self.file_log: - LOG_ROOT.removeHandler(self.file_log) + self.file_log.flush() + loggers.LOG_ROOT.removeHandler(self.file_log) self.file_log.close() self.close_browser_session() @@ -258,7 +261,7 @@ class KleinanzeigenBot(WebScrapingMixin): self.file_log = RotatingFileHandler(filename = self.log_file_path, maxBytes = 10 * 1024 * 1024, backupCount = 10, encoding = "utf-8") self.file_log.setLevel(logging.DEBUG) self.file_log.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) - LOG_ROOT.addHandler(self.file_log) + loggers.LOG_ROOT.addHandler(self.file_log) LOG.info("App version: %s", self.get_version()) LOG.info("Python version: %s", sys.version) @@ -333,13 +336,13 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info('Start fetch task for the ad(s) with id(s):') LOG.info(' | '.join([str(id_) for id_ in ids])) - ad_fields = utils.load_dict_from_module(resources, "ad_fields.yaml") + ad_fields = dicts.load_dict_from_module(resources, "ad_fields.yaml") ads = [] for ad_file, ad_file_relative in sorted(ad_files.items()): - ad_cfg_orig = utils.load_dict(ad_file, "ad") + ad_cfg_orig = dicts.load_dict(ad_file, "ad") ad_cfg = copy.deepcopy(ad_cfg_orig) - apply_defaults(ad_cfg, self.config["ad_defaults"], ignore = lambda k, _: k == "description", override = lambda _, v: v == "") - apply_defaults(ad_cfg, ad_fields) + dicts.apply_defaults(ad_cfg, self.config["ad_defaults"], ignore = lambda k, _: k == "description", override = lambda _, v: v == "") + dicts.apply_defaults(ad_cfg, ad_fields) if ignore_inactive and not ad_cfg["active"]: LOG.info(" -> SKIPPED: inactive ad [%s]", ad_file_relative) @@ -365,13 +368,13 @@ class KleinanzeigenBot(WebScrapingMixin): # pylint: disable=cell-var-from-loop def assert_one_of(path:str, allowed:Iterable[str]) -> None: - ensure(safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]") + ensure(dicts.safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]") def assert_min_len(path:str, minlen:int) -> None: - ensure(len(safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]") + ensure(len(dicts.safe_get(ad_cfg, *path.split("."))) >= minlen, f"-> property [{path}] must be at least {minlen} characters long @ [{ad_file}]") def assert_has_value(path:str) -> None: - ensure(safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]") + ensure(dicts.safe_get(ad_cfg, *path.split(".")), f"-> property [{path}] not specified @ [{ad_file}]") # pylint: enable=cell-var-from-loop assert_one_of("type", {"OFFER", "WANTED"}) @@ -379,7 +382,7 @@ class KleinanzeigenBot(WebScrapingMixin): assert_has_value("description") assert_one_of("price_type", {"FIXED", "NEGOTIABLE", "GIVE_AWAY", "NOT_APPLICABLE"}) if ad_cfg["price_type"] == "GIVE_AWAY": - ensure(not safe_get(ad_cfg, "price"), f"-> [price] must not be specified for GIVE_AWAY ad @ [{ad_file}]") + ensure(not dicts.safe_get(ad_cfg, "price"), f"-> [price] must not be specified for GIVE_AWAY ad @ [{ad_file}]") elif ad_cfg["price_type"] == "FIXED": assert_has_value("price") @@ -405,7 +408,7 @@ class KleinanzeigenBot(WebScrapingMixin): ad_cfg["category"] = resolved_category_id if ad_cfg["shipping_costs"]: - ad_cfg["shipping_costs"] = str(round(utils.parse_decimal(ad_cfg["shipping_costs"]), 2)) + ad_cfg["shipping_costs"] = str(round(misc.parse_decimal(ad_cfg["shipping_costs"]), 2)) if ad_cfg["images"]: images = [] @@ -433,18 +436,18 @@ class KleinanzeigenBot(WebScrapingMixin): return ads def load_config(self) -> None: - config_defaults = utils.load_dict_from_module(resources, "config_defaults.yaml") - config = utils.load_dict_if_exists(self.config_file_path, _("config")) + config_defaults = dicts.load_dict_from_module(resources, "config_defaults.yaml") + config = dicts.load_dict_if_exists(self.config_file_path, _("config")) if config is None: LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path) - utils.save_dict(self.config_file_path, config_defaults) + dicts.save_dict(self.config_file_path, config_defaults) config = {} - self.config = apply_defaults(config, config_defaults) + self.config = dicts.apply_defaults(config, config_defaults) - self.categories = utils.load_dict_from_module(resources, "categories.yaml", "categories") - deprecated_categories = utils.load_dict_from_module(resources, "categories_old.yaml", "categories") + self.categories = dicts.load_dict_from_module(resources, "categories.yaml", "categories") + deprecated_categories = dicts.load_dict_from_module(resources, "categories_old.yaml", "categories") self.categories.update(deprecated_categories) if self.config["categories"]: self.categories.update(self.config["categories"]) @@ -675,7 +678,7 @@ class KleinanzeigenBot(WebScrapingMixin): await self.web_select(By.CSS_SELECTOR, "select#price-type-react, select#micro-frontend-price-type, select#priceType", price_type) except TimeoutError: pass - if safe_get(ad_cfg, "price"): + if dicts.safe_get(ad_cfg, "price"): await self.web_input(By.CSS_SELECTOR, "input#post-ad-frontend-price, input#micro-frontend-price, input#pstad-price", ad_cfg["price"]) ############################# @@ -797,7 +800,7 @@ class KleinanzeigenBot(WebScrapingMixin): LOG.info(" -> SUCCESS: ad published with ID %s", ad_id) - utils.save_dict(ad_file, ad_cfg_orig) + dicts.save_dict(ad_file, ad_cfg_orig) async def __set_condition(self, condition_value: str) -> None: condition_mapping = { @@ -1047,11 +1050,11 @@ def main(args:list[str]) -> None: https://github.com/Second-Hand-Friends/kleinanzeigen-bot """)[1:], flush = True) # [1:] removes the first empty blank line - utils.configure_console_logging() + loggers.configure_console_logging() - signal.signal(signal.SIGINT, utils.on_sigint) # capture CTRL+C - sys.excepthook = utils.on_exception - atexit.register(utils.on_exit) + signal.signal(signal.SIGINT, error_handlers.on_sigint) # capture CTRL+C + sys.excepthook = error_handlers.on_exception + atexit.register(loggers.flush_all_handlers) bot = KleinanzeigenBot() atexit.register(bot.close_browser_session) @@ -1059,6 +1062,6 @@ def main(args:list[str]) -> None: if __name__ == "__main__": - utils.configure_console_logging() + loggers.configure_console_logging() LOG.error("Direct execution not supported. Use 'pdm run app'") sys.exit(1) diff --git a/src/kleinanzeigen_bot/ads.py b/src/kleinanzeigen_bot/ads.py new file mode 100644 index 0000000..0eb35a2 --- /dev/null +++ b/src/kleinanzeigen_bot/ads.py @@ -0,0 +1,38 @@ +""" +SPDX-FileCopyrightText: © Jens Bergman and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import json, os, hashlib +from typing import Any + + +def calculate_content_hash(ad_cfg: dict[str, Any]) -> str: + """Calculate a hash for user-modifiable fields of the ad.""" + + # Relevant fields for the hash + content = { + "active": bool(ad_cfg.get("active", True)), # Explicitly convert to bool + "type": str(ad_cfg.get("type", "")), # Explicitly convert to string + "title": str(ad_cfg.get("title", "")), + "description": str(ad_cfg.get("description", "")), + "category": str(ad_cfg.get("category", "")), + "price": str(ad_cfg.get("price", "")), # Price always as string + "price_type": str(ad_cfg.get("price_type", "")), + "special_attributes": dict(ad_cfg.get("special_attributes") or {}), # Handle None case + "shipping_type": str(ad_cfg.get("shipping_type", "")), + "shipping_costs": str(ad_cfg.get("shipping_costs", "")), + "shipping_options": sorted([str(x) for x in (ad_cfg.get("shipping_options") or [])]), # Handle None case + "sell_directly": bool(ad_cfg.get("sell_directly", False)), # Explicitly convert to bool + "images": sorted([os.path.basename(str(img)) if img is not None else "" for img in (ad_cfg.get("images") or [])]), # Handle None values in images + "contact": { + "name": str(ad_cfg.get("contact", {}).get("name", "")), + "street": str(ad_cfg.get("contact", {}).get("street", "")), # Changed from "None" to empty string for consistency + "zipcode": str(ad_cfg.get("contact", {}).get("zipcode", "")), + "phone": str(ad_cfg.get("contact", {}).get("phone", "")) + } + } + + # Create sorted JSON string for consistent hashes + content_str = json.dumps(content, sort_keys = True) + return hashlib.sha256(content_str.encode()).hexdigest() diff --git a/src/kleinanzeigen_bot/extract.py b/src/kleinanzeigen_bot/extract.py index 7d87df9..c8f97a4 100644 --- a/src/kleinanzeigen_bot/extract.py +++ b/src/kleinanzeigen_bot/extract.py @@ -3,22 +3,20 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ -import logging, os, shutil +import json, mimetypes, os, shutil import urllib.request as urllib_request -import mimetypes from datetime import datetime from typing import Any, Final -import json -from .i18n import get_translating_logger, pluralize -from .utils import is_integer, parse_decimal, save_dict, calculate_content_hash -from .web_scraping_mixin import Browser, By, Element, Is, WebScrapingMixin +from .ads import calculate_content_hash +from .utils import dicts, i18n, loggers, misc, reflect +from .utils.web_scraping_mixin import Browser, By, Element, Is, WebScrapingMixin __all__ = [ "AdExtractor", ] -LOG:Final[logging.Logger] = get_translating_logger(__name__) +LOG:Final[loggers.Logger] = loggers.get_logger(__name__) class AdExtractor(WebScrapingMixin): @@ -56,7 +54,7 @@ class AdExtractor(WebScrapingMixin): # call extraction function info = await self._extract_ad_page_info(new_base_dir, ad_id) ad_file_path = new_base_dir + '/' + f'ad_{ad_id}.yaml' - save_dict(ad_file_path, info) + dicts.save_dict(ad_file_path, info) async def _download_images_from_ad_page(self, directory:str, ad_id:int) -> list[str]: """ @@ -74,7 +72,7 @@ class AdExtractor(WebScrapingMixin): image_box = await self.web_find(By.CLASS_NAME, 'galleryimage-large') n_images = len(await self.web_find_all(By.CSS_SELECTOR, '.galleryimage-element[data-ix]', parent = image_box)) - LOG.info('Found %s.', pluralize("image", n_images)) + LOG.info('Found %s.', i18n.pluralize("image", n_images)) img_element:Element = await self.web_find(By.CSS_SELECTOR, 'div:nth-child(1) > img', parent = image_box) img_fn_prefix = 'ad_' + str(ad_id) + '__img' @@ -106,7 +104,7 @@ class AdExtractor(WebScrapingMixin): LOG.error('NEXT button in image gallery somehow missing, aborting image fetching.') break img_nr += 1 - LOG.info('Downloaded %s.', pluralize("image", dl_counter)) + LOG.info('Downloaded %s.', i18n.pluralize("image", dl_counter)) except TimeoutError: # some ads do not require images LOG.warning('No image area found. Continuing without downloading images.') @@ -193,7 +191,7 @@ class AdExtractor(WebScrapingMixin): Navigates to an ad page specified with an ad ID; or alternatively by a given URL. :return: whether the navigation to the ad page was successful """ - if is_integer(id_or_url): + if reflect.is_integer(id_or_url): # navigate to start page, otherwise page can be None! await self.web_open('https://www.kleinanzeigen.de/') # enter the ad ID into the search bar @@ -349,7 +347,7 @@ class AdExtractor(WebScrapingMixin): elif '€' in shipping_text: shipping_price_parts = shipping_text.split(' ') ship_type = 'SHIPPING' - ship_costs = float(parse_decimal(shipping_price_parts[-2])) + ship_costs = float(misc.parse_decimal(shipping_price_parts[-2])) # reading shipping option from kleinanzeigen # and find the right one by price diff --git a/src/kleinanzeigen_bot/utils.py b/src/kleinanzeigen_bot/utils.py deleted file mode 100644 index 7703d3f..0000000 --- a/src/kleinanzeigen_bot/utils.py +++ /dev/null @@ -1,366 +0,0 @@ -""" -SPDX-FileCopyrightText: © Sebastian Thomschke and contributors -SPDX-License-Identifier: AGPL-3.0-or-later -SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ -""" -import asyncio, copy, decimal, inspect, json, logging, os, re, socket, sys, traceback, time, hashlib -from importlib.resources import read_text as get_resource_as_string -from collections.abc import Callable -from datetime import datetime -from gettext import gettext as _ -from types import FrameType, ModuleType, TracebackType -from typing import Any, Final, TypeVar - -import colorama -from ruamel.yaml import YAML -from .i18n import get_translating_logger - -LOG_ROOT:Final[logging.Logger] = logging.getLogger() -LOG:Final[logging.Logger] = get_translating_logger(__name__) - -# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions -T = TypeVar('T') - - -def abspath(relative_path:str, relative_to:str | None = None) -> str: - """ - Makes a given relative path absolute based on another file/folder - """ - if os.path.isabs(relative_path): - return relative_path - - if not relative_to: - return os.path.abspath(relative_path) - - if os.path.isfile(relative_to): - relative_to = os.path.dirname(relative_to) - - return os.path.normpath(os.path.join(relative_to, relative_path)) - - -def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None: - """ - :param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds - :param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds - :raises AssertionError: if condition did not come `True` within given timespan - """ - if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864 - if condition: - return - raise AssertionError(_(error_message)) - - if timeout < 0: - raise AssertionError("[timeout] must be >= 0") - if poll_requency < 0: - raise AssertionError("[poll_requency] must be >= 0") - - start_at = time.time() - while not condition(): # type: ignore[operator] - elapsed = time.time() - start_at - if elapsed >= timeout: - raise AssertionError(_(error_message)) - time.sleep(poll_requency) - - -def get_caller(depth: int = 1) -> inspect.FrameInfo | None: - stack = inspect.stack() - try: - for frame in stack[depth + 1:]: - if frame.function and frame.function != "": - return frame - return None - finally: - del stack # Clean up the stack to avoid reference cycles - - -def is_frozen() -> bool: - """ - >>> is_frozen() - False - """ - return getattr(sys, "frozen", False) - - -def is_integer(obj:Any) -> bool: - try: - int(obj) - return True - except (ValueError, TypeError): - return False - - -def is_port_open(host:str, port:int) -> bool: - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(1) - s.connect((host, port)) - return True - except Exception: - return False - finally: - s.close() - - -async def ainput(prompt: str) -> str: - return await asyncio.to_thread(input, f'{prompt} ') - - -def apply_defaults( - target:dict[Any, Any], - defaults:dict[Any, Any], - ignore:Callable[[Any, Any], bool] = lambda _k, _v: False, - override:Callable[[Any, Any], bool] = lambda _k, _v: False -) -> dict[Any, Any]: - """ - >>> apply_defaults({}, {"foo": "bar"}) - {'foo': 'bar'} - >>> apply_defaults({"foo": "foo"}, {"foo": "bar"}) - {'foo': 'foo'} - >>> apply_defaults({"foo": ""}, {"foo": "bar"}) - {'foo': ''} - >>> apply_defaults({}, {"foo": "bar"}, ignore = lambda k, _: k == "foo") - {} - >>> apply_defaults({"foo": ""}, {"foo": "bar"}, override = lambda _, v: v == "") - {'foo': 'bar'} - >>> apply_defaults({"foo": None}, {"foo": "bar"}, override = lambda _, v: v == "") - {'foo': None} - """ - for key, default_value in defaults.items(): - if key in target: - if isinstance(target[key], dict) and isinstance(default_value, dict): - apply_defaults(target[key], default_value, ignore = ignore) - elif override(key, target[key]): - target[key] = copy.deepcopy(default_value) - elif not ignore(key, default_value): - target[key] = copy.deepcopy(default_value) - return target - - -def safe_get(a_map:dict[Any, Any], *keys:str) -> Any: - """ - >>> safe_get({"foo": {}}, "foo", "bar") is None - True - >>> safe_get({"foo": {"bar": "some_value"}}, "foo", "bar") - 'some_value' - """ - if a_map: - for key in keys: - try: - a_map = a_map[key] - except (KeyError, TypeError): - return None - return a_map - - -def configure_console_logging() -> None: - - class CustomFormatter(logging.Formatter): - LEVEL_COLORS = { - logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, - logging.INFO: colorama.Fore.BLACK + colorama.Style.BRIGHT, - logging.WARNING: colorama.Fore.YELLOW, - logging.ERROR: colorama.Fore.RED, - logging.CRITICAL: colorama.Fore.RED, - } - MESSAGE_COLORS = { - logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, - logging.INFO: colorama.Fore.RESET, - logging.WARNING: colorama.Fore.YELLOW, - logging.ERROR: colorama.Fore.RED, - logging.CRITICAL: colorama.Fore.RED + colorama.Style.BRIGHT, - } - VALUE_COLORS = { - logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, - logging.INFO: colorama.Fore.MAGENTA, - logging.WARNING: colorama.Fore.MAGENTA, - logging.ERROR: colorama.Fore.MAGENTA, - logging.CRITICAL: colorama.Fore.MAGENTA, - } - - def format(self, record:logging.LogRecord) -> str: - record = copy.deepcopy(record) - - level_color = self.LEVEL_COLORS.get(record.levelno, "") - msg_color = self.MESSAGE_COLORS.get(record.levelno, "") - value_color = self.VALUE_COLORS.get(record.levelno, "") - - # translate and colorize log level name - levelname = _(record.levelname) if record.levelno > logging.DEBUG else record.levelname - record.levelname = f"{level_color}[{levelname}]{colorama.Style.RESET_ALL}" - - # highlight message values enclosed by [...], "...", and '...' - record.msg = re.sub( - r"\[([^\]]+)\]|\"([^\"]+)\"|\'([^\']+)\'", - lambda match: f"[{value_color}{match.group(1) or match.group(2) or match.group(3)}{colorama.Fore.RESET}{msg_color}]", - str(record.msg), - ) - - # colorize message - record.msg = f"{msg_color}{record.msg}{colorama.Style.RESET_ALL}" - - return super().format(record) - - formatter = CustomFormatter("%(levelname)s %(message)s") - - stdout_log = logging.StreamHandler(sys.stderr) - stdout_log.setLevel(logging.DEBUG) - stdout_log.addFilter(type("", (logging.Filter,), { - "filter": lambda rec: rec.levelno <= logging.INFO - })) - stdout_log.setFormatter(formatter) - LOG_ROOT.addHandler(stdout_log) - - stderr_log = logging.StreamHandler(sys.stderr) - stderr_log.setLevel(logging.WARNING) - stderr_log.setFormatter(formatter) - LOG_ROOT.addHandler(stderr_log) - - -def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None: - if issubclass(ex_type, KeyboardInterrupt): - sys.__excepthook__(ex_type, ex_value, ex_traceback) - elif LOG.isEnabledFor(logging.DEBUG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)): - LOG.error("".join(traceback.format_exception(ex_type, ex_value, ex_traceback))) - elif isinstance(ex_value, AssertionError): - LOG.error(ex_value) - else: - LOG.error("%s: %s", ex_type.__name__, ex_value) - - -def on_exit() -> None: - for handler in LOG_ROOT.handlers: - handler.flush() - - -def on_sigint(_sig:int, _frame:FrameType | None) -> None: - LOG.warning("Aborted on user request.") - sys.exit(0) - - -def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]: - """ - :raises FileNotFoundError - """ - data = load_dict_if_exists(filepath, content_label) - if data is None: - raise FileNotFoundError(filepath) - return data - - -def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None: - abs_filepath = os.path.abspath(filepath) - LOG.info("Loading %s[%s]...", content_label and content_label + _(" from ") or "", abs_filepath) - - __, file_ext = os.path.splitext(filepath) - if file_ext not in (".json", ".yaml", ".yml"): - raise ValueError(_('Unsupported file type. The filename "%s" must end with *.json, *.yaml, or *.yml') % filepath) - - if not os.path.exists(filepath): - return None - - with open(filepath, encoding = "utf-8") as file: - return json.load(file) if filepath.endswith(".json") else YAML().load(file) # type: ignore[no-any-return] # mypy - - -def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]: - """ - :raises FileNotFoundError - """ - LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename) - - __, file_ext = os.path.splitext(filename) - if file_ext not in (".json", ".yaml", ".yml"): - raise ValueError(f'Unsupported file type. The filename "{filename}" must end with *.json, *.yaml, or *.yml') - - content = get_resource_as_string(module, filename) # pylint: disable=deprecated-method - return json.loads(content) if filename.endswith(".json") else YAML().load(content) # type: ignore[no-any-return] # mypy - - -def save_dict(filepath:str, content:dict[str, Any]) -> None: - filepath = os.path.abspath(filepath) - LOG.info("Saving [%s]...", filepath) - with open(filepath, "w", encoding = "utf-8") as file: - if filepath.endswith(".json"): - file.write(json.dumps(content, indent = 2, ensure_ascii = False)) - else: - yaml = YAML() - yaml.indent(mapping = 2, sequence = 4, offset = 2) - yaml.representer.add_representer(str, # use YAML | block style for multi-line strings - lambda dumper, data: - dumper.represent_scalar('tag:yaml.org,2002:str', data, style = '|' if '\n' in data else None) - ) - yaml.allow_duplicate_keys = False - yaml.explicit_start = False - yaml.dump(content, file) - - -def parse_decimal(number:float | int | str) -> decimal.Decimal: - """ - >>> parse_decimal(5) - Decimal('5') - >>> parse_decimal(5.5) - Decimal('5.5') - >>> parse_decimal("5.5") - Decimal('5.5') - >>> parse_decimal("5,5") - Decimal('5.5') - >>> parse_decimal("1.005,5") - Decimal('1005.5') - >>> parse_decimal("1,005.5") - Decimal('1005.5') - """ - try: - return decimal.Decimal(number) - except decimal.InvalidOperation as ex: - parts = re.split("[.,]", str(number)) - try: - return decimal.Decimal("".join(parts[:-1]) + "." + parts[-1]) - except decimal.InvalidOperation: - raise decimal.DecimalException(f"Invalid number format: {number}") from ex - - -def parse_datetime(date:datetime | str | None) -> datetime | None: - """ - >>> parse_datetime(datetime(2020, 1, 1, 0, 0)) - datetime.datetime(2020, 1, 1, 0, 0) - >>> parse_datetime("2020-01-01T00:00:00") - datetime.datetime(2020, 1, 1, 0, 0) - >>> parse_datetime(None) - - """ - if date is None: - return None - if isinstance(date, datetime): - return date - return datetime.fromisoformat(date) - - -def calculate_content_hash(ad_cfg: dict[str, Any]) -> str: - """Calculate a hash for user-modifiable fields of the ad.""" - - # Relevant fields for the hash - content = { - "active": bool(ad_cfg.get("active", True)), # Explicitly convert to bool - "type": str(ad_cfg.get("type", "")), # Explicitly convert to string - "title": str(ad_cfg.get("title", "")), - "description": str(ad_cfg.get("description", "")), - "category": str(ad_cfg.get("category", "")), - "price": str(ad_cfg.get("price", "")), # Price always as string - "price_type": str(ad_cfg.get("price_type", "")), - "special_attributes": dict(ad_cfg.get("special_attributes") or {}), # Handle None case - "shipping_type": str(ad_cfg.get("shipping_type", "")), - "shipping_costs": str(ad_cfg.get("shipping_costs", "")), - "shipping_options": sorted([str(x) for x in (ad_cfg.get("shipping_options") or [])]), # Handle None case - "sell_directly": bool(ad_cfg.get("sell_directly", False)), # Explicitly convert to bool - "images": sorted([os.path.basename(str(img)) if img is not None else "" for img in (ad_cfg.get("images") or [])]), # Handle None values in images - "contact": { - "name": str(ad_cfg.get("contact", {}).get("name", "")), - "street": str(ad_cfg.get("contact", {}).get("street", "")), # Changed from "None" to empty string for consistency - "zipcode": str(ad_cfg.get("contact", {}).get("zipcode", "")), - "phone": str(ad_cfg.get("contact", {}).get("phone", "")) - } - } - - # Create sorted JSON string for consistent hashes - content_str = json.dumps(content, sort_keys=True) - return hashlib.sha256(content_str.encode()).hexdigest() diff --git a/src/kleinanzeigen_bot/utils/__init__.py b/src/kleinanzeigen_bot/utils/__init__.py new file mode 100644 index 0000000..51a4f69 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/__init__.py @@ -0,0 +1,3 @@ +""" +This module contains generic, reusable code. +""" diff --git a/src/kleinanzeigen_bot/utils/dicts.py b/src/kleinanzeigen_bot/utils/dicts.py new file mode 100644 index 0000000..73ea914 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/dicts.py @@ -0,0 +1,120 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import copy, json, os +from collections.abc import Callable +from importlib.resources import read_text as get_resource_as_string +from gettext import gettext as _ +from types import ModuleType +from typing import Any, Final + +from ruamel.yaml import YAML +from . import files, loggers # pylint: disable=cyclic-import + +LOG:Final[loggers.Logger] = loggers.get_logger(__name__) + + +def apply_defaults( + target:dict[Any, Any], + defaults:dict[Any, Any], + ignore:Callable[[Any, Any], bool] = lambda _k, _v: False, + override:Callable[[Any, Any], bool] = lambda _k, _v: False +) -> dict[Any, Any]: + """ + >>> apply_defaults({}, {"foo": "bar"}) + {'foo': 'bar'} + >>> apply_defaults({"foo": "foo"}, {"foo": "bar"}) + {'foo': 'foo'} + >>> apply_defaults({"foo": ""}, {"foo": "bar"}) + {'foo': ''} + >>> apply_defaults({}, {"foo": "bar"}, ignore = lambda k, _: k == "foo") + {} + >>> apply_defaults({"foo": ""}, {"foo": "bar"}, override = lambda _, v: v == "") + {'foo': 'bar'} + >>> apply_defaults({"foo": None}, {"foo": "bar"}, override = lambda _, v: v == "") + {'foo': None} + """ + for key, default_value in defaults.items(): + if key in target: + if isinstance(target[key], dict) and isinstance(default_value, dict): + apply_defaults(target[key], default_value, ignore = ignore) + elif override(key, target[key]): + target[key] = copy.deepcopy(default_value) + elif not ignore(key, default_value): + target[key] = copy.deepcopy(default_value) + return target + + +def load_dict(filepath:str, content_label:str = "") -> dict[str, Any]: + """ + :raises FileNotFoundError + """ + data = load_dict_if_exists(filepath, content_label) + if data is None: + raise FileNotFoundError(filepath) + return data + + +def load_dict_if_exists(filepath:str, content_label:str = "") -> dict[str, Any] | None: + abs_filepath = files.abspath(filepath) + LOG.info("Loading %s[%s]...", content_label and content_label + _(" from ") or "", abs_filepath) + + __, file_ext = os.path.splitext(filepath) + if file_ext not in (".json", ".yaml", ".yml"): + raise ValueError(_('Unsupported file type. The filename "%s" must end with *.json, *.yaml, or *.yml') % filepath) + + if not os.path.exists(filepath): + return None + + with open(filepath, encoding = "utf-8") as file: + return json.load(file) if filepath.endswith(".json") else YAML().load(file) # type: ignore[no-any-return] # mypy + + +def load_dict_from_module(module:ModuleType, filename:str, content_label:str = "") -> dict[str, Any]: + """ + :raises FileNotFoundError + """ + LOG.debug("Loading %s[%s.%s]...", content_label and content_label + " from " or "", module.__name__, filename) + + __, file_ext = os.path.splitext(filename) + if file_ext not in (".json", ".yaml", ".yml"): + raise ValueError(f'Unsupported file type. The filename "{filename}" must end with *.json, *.yaml, or *.yml') + + content = get_resource_as_string(module, filename) # pylint: disable=deprecated-method + return json.loads(content) if filename.endswith(".json") else YAML().load(content) # type: ignore[no-any-return] # mypy + + +def save_dict(filepath:str, content:dict[str, Any]) -> None: + filepath = files.abspath(filepath) + LOG.info("Saving [%s]...", filepath) + with open(filepath, "w", encoding = "utf-8") as file: + if filepath.endswith(".json"): + file.write(json.dumps(content, indent = 2, ensure_ascii = False)) + else: + yaml = YAML() + yaml.indent(mapping = 2, sequence = 4, offset = 2) + yaml.representer.add_representer(str, # use YAML | block style for multi-line strings + lambda dumper, data: + dumper.represent_scalar('tag:yaml.org,2002:str', data, style = '|' if '\n' in data else None) + ) + yaml.allow_duplicate_keys = False + yaml.explicit_start = False + yaml.dump(content, file) + + +def safe_get(a_map:dict[Any, Any], *keys:str) -> Any: + """ + >>> safe_get({"foo": {}}, "foo", "bar") is None + True + >>> safe_get({"foo": {"bar": "some_value"}}, "foo", "bar") + 'some_value' + """ + if a_map: + for key in keys: + try: + a_map = a_map[key] + except (KeyError, TypeError): + return None + return a_map diff --git a/src/kleinanzeigen_bot/utils/error_handlers.py b/src/kleinanzeigen_bot/utils/error_handlers.py new file mode 100644 index 0000000..6867b74 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/error_handlers.py @@ -0,0 +1,28 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import sys, traceback +from types import FrameType, TracebackType +from typing import Any, Final + +from . import loggers + +LOG:Final[loggers.Logger] = loggers.get_logger(__name__) + + +def on_exception(ex_type:type[BaseException], ex_value:Any, ex_traceback:TracebackType | None) -> None: + if issubclass(ex_type, KeyboardInterrupt): + sys.__excepthook__(ex_type, ex_value, ex_traceback) + elif loggers.is_debug(LOG) or isinstance(ex_value, (AttributeError, ImportError, NameError, TypeError)): + LOG.error("".join(traceback.format_exception(ex_type, ex_value, ex_traceback))) + elif isinstance(ex_value, AssertionError): + LOG.error(ex_value) + else: + LOG.error("%s: %s", ex_type.__name__, ex_value) + + +def on_sigint(_sig:int, _frame:FrameType | None) -> None: + LOG.warning("Aborted on user request.") + sys.exit(0) diff --git a/src/kleinanzeigen_bot/utils/files.py b/src/kleinanzeigen_bot/utils/files.py new file mode 100644 index 0000000..b9840f4 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/files.py @@ -0,0 +1,22 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import os + + +def abspath(relative_path:str, relative_to:str | None = None) -> str: + """ + Makes a given relative path absolute based on another file/folder + """ + if not relative_to: + return os.path.abspath(relative_path) + + if os.path.isabs(relative_path): + return relative_path + + if os.path.isfile(relative_to): + relative_to = os.path.dirname(relative_to) + + return os.path.normpath(os.path.join(relative_to, relative_path)) diff --git a/src/kleinanzeigen_bot/i18n.py b/src/kleinanzeigen_bot/utils/i18n.py similarity index 88% rename from src/kleinanzeigen_bot/i18n.py rename to src/kleinanzeigen_bot/utils/i18n.py index a9dbc0c..f5ff5fe 100644 --- a/src/kleinanzeigen_bot/i18n.py +++ b/src/kleinanzeigen_bot/utils/i18n.py @@ -7,14 +7,18 @@ import ctypes, gettext, inspect, locale, logging, os, sys from collections.abc import Sized from typing import Any, Final, NamedTuple -from . import resources, utils # pylint: disable=cyclic-import +from kleinanzeigen_bot import resources +from . import reflect +from . import dicts __all__ = [ "Locale", - "get_translating_logger", + "get_current_locale", + "pluralize", + "set_current_locale", + "translate" ] -LOG_ROOT:Final[logging.Logger] = logging.getLogger() LOG:Final[logging.Logger] = logging.getLogger(__name__) @@ -96,7 +100,7 @@ def translate(text:object, caller: inspect.FrameInfo | None) -> str: global _TRANSLATIONS if _TRANSLATIONS is None: try: - _TRANSLATIONS = utils.load_dict_from_module(resources, f"translations.{_CURRENT_LOCALE[0]}.yaml") + _TRANSLATIONS = dicts.load_dict_from_module(resources, f"translations.{_CURRENT_LOCALE[0]}.yaml") except FileNotFoundError: _TRANSLATIONS = {} @@ -108,7 +112,7 @@ def translate(text:object, caller: inspect.FrameInfo | None) -> str: if module_name and module_name.endswith(f".{file_basename}"): module_name = module_name[:-(len(file_basename) + 1)] file_key = f"{file_basename}.py" if module_name == file_basename else f"{module_name}/{file_basename}.py" - translation = utils.safe_get(_TRANSLATIONS, + translation = dicts.safe_get(_TRANSLATIONS, file_key, caller.function, text @@ -116,8 +120,9 @@ def translate(text:object, caller: inspect.FrameInfo | None) -> str: return translation if translation else text +# replace gettext.gettext with custom _translate function _original_gettext = gettext.gettext -gettext.gettext = lambda message: translate(_original_gettext(message), utils.get_caller()) +gettext.gettext = lambda message: translate(_original_gettext(message), reflect.get_caller()) for module_name, module in sys.modules.items(): if module is None or module_name in sys.builtin_module_names: continue @@ -127,19 +132,6 @@ for module_name, module in sys.modules.items(): setattr(module, 'gettext', gettext.gettext) -def get_translating_logger(name: str | None = None) -> logging.Logger: - - class TranslatingLogger(logging.Logger): - - def _log(self, level: int, msg: object, *args: Any, **kwargs: Any) -> None: - if level != logging.DEBUG: # debug messages should not be translated - msg = translate(msg, utils.get_caller(2)) - super()._log(level, msg, *args, **kwargs) - - logging.setLoggerClass(TranslatingLogger) - return logging.getLogger(name) - - def get_current_locale() -> Locale: return _CURRENT_LOCALE @@ -161,7 +153,7 @@ def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str >>> pluralize("field", 2, prefix_with_count = False) 'fields' """ - noun = translate(noun, utils.get_caller()) + noun = translate(noun, reflect.get_caller()) if isinstance(count, Sized): count = len(count) diff --git a/src/kleinanzeigen_bot/utils/loggers.py b/src/kleinanzeigen_bot/utils/loggers.py new file mode 100644 index 0000000..9171e32 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/loggers.py @@ -0,0 +1,116 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import copy, logging, re, sys +from gettext import gettext as _ +from typing import Any, Final # @UnusedImport + +import colorama +from . import i18n, reflect + +__all__ = [ + "Logger", + "LOG_ROOT", + "DEBUG", + "INFO", + "configure_console_logging", + "flush_all_handlers", + "get_logger" +] + +Logger = logging.Logger +DEBUG:Final[int] = logging.DEBUG +INFO:Final[int] = logging.INFO + +LOG_ROOT:Final[logging.Logger] = logging.getLogger() + + +def configure_console_logging() -> None: + + class CustomFormatter(logging.Formatter): + LEVEL_COLORS = { + logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, + logging.INFO: colorama.Fore.BLACK + colorama.Style.BRIGHT, + logging.WARNING: colorama.Fore.YELLOW, + logging.ERROR: colorama.Fore.RED, + logging.CRITICAL: colorama.Fore.RED, + } + MESSAGE_COLORS = { + logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, + logging.INFO: colorama.Fore.RESET, + logging.WARNING: colorama.Fore.YELLOW, + logging.ERROR: colorama.Fore.RED, + logging.CRITICAL: colorama.Fore.RED + colorama.Style.BRIGHT, + } + VALUE_COLORS = { + logging.DEBUG: colorama.Fore.BLACK + colorama.Style.BRIGHT, + logging.INFO: colorama.Fore.MAGENTA, + logging.WARNING: colorama.Fore.MAGENTA, + logging.ERROR: colorama.Fore.MAGENTA, + logging.CRITICAL: colorama.Fore.MAGENTA, + } + + def format(self, record:logging.LogRecord) -> str: + record = copy.deepcopy(record) + + level_color = self.LEVEL_COLORS.get(record.levelno, "") + msg_color = self.MESSAGE_COLORS.get(record.levelno, "") + value_color = self.VALUE_COLORS.get(record.levelno, "") + + # translate and colorize log level name + levelname = _(record.levelname) if record.levelno > logging.DEBUG else record.levelname + record.levelname = f"{level_color}[{levelname}]{colorama.Style.RESET_ALL}" + + # highlight message values enclosed by [...], "...", and '...' + record.msg = re.sub( + r"\[([^\]]+)\]|\"([^\"]+)\"|\'([^\']+)\'", + lambda match: f"[{value_color}{match.group(1) or match.group(2) or match.group(3)}{colorama.Fore.RESET}{msg_color}]", + str(record.msg), + ) + + # colorize message + record.msg = f"{msg_color}{record.msg}{colorama.Style.RESET_ALL}" + + return super().format(record) + + formatter = CustomFormatter("%(levelname)s %(message)s") + + stdout_log = logging.StreamHandler(sys.stderr) + stdout_log.setLevel(logging.DEBUG) + stdout_log.addFilter(type("", (logging.Filter,), { + "filter": lambda rec: rec.levelno <= logging.INFO + })) + stdout_log.setFormatter(formatter) + LOG_ROOT.addHandler(stdout_log) + + stderr_log = logging.StreamHandler(sys.stderr) + stderr_log.setLevel(logging.WARNING) + stderr_log.setFormatter(formatter) + LOG_ROOT.addHandler(stderr_log) + + +def flush_all_handlers() -> None: + for handler in LOG_ROOT.handlers: + handler.flush() + + +def get_logger(name: str | None = None) -> logging.Logger: + """ + Returns a localized logger + """ + + class TranslatingLogger(logging.Logger): + + def _log(self, level: int, msg: object, *args: Any, **kwargs: Any) -> None: + if level != logging.DEBUG: # debug messages should not be translated + msg = i18n.translate(msg, reflect.get_caller(2)) + super()._log(level, msg, *args, **kwargs) + + logging.setLoggerClass(TranslatingLogger) + return logging.getLogger(name) + + +def is_debug(logger:Logger) -> bool: + return logger.isEnabledFor(logging.DEBUG) diff --git a/src/kleinanzeigen_bot/utils/misc.py b/src/kleinanzeigen_bot/utils/misc.py new file mode 100644 index 0000000..5db73a2 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/misc.py @@ -0,0 +1,90 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import asyncio, decimal, re, sys, time +from collections.abc import Callable +from datetime import datetime +from gettext import gettext as _ +from typing import Any, TypeVar + +# https://mypy.readthedocs.io/en/stable/generics.html#generic-functions +T = TypeVar('T') + + +def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout:float = 5, poll_requency:float = 0.5) -> None: + """ + :param timeout: timespan in seconds until when the condition must become `True`, default is 5 seconds + :param poll_requency: sleep interval between calls in seconds, default is 0.5 seconds + :raises AssertionError: if condition did not come `True` within given timespan + """ + if not isinstance(condition, Callable): # type: ignore[arg-type] # https://github.com/python/mypy/issues/6864 + if condition: + return + raise AssertionError(_(error_message)) + + if timeout < 0: + raise AssertionError("[timeout] must be >= 0") + if poll_requency < 0: + raise AssertionError("[poll_requency] must be >= 0") + + start_at = time.time() + while not condition(): # type: ignore[operator] + elapsed = time.time() - start_at + if elapsed >= timeout: + raise AssertionError(_(error_message)) + time.sleep(poll_requency) + + +def is_frozen() -> bool: + """ + >>> is_frozen() + False + """ + return getattr(sys, "frozen", False) + + +async def ainput(prompt: str) -> str: + return await asyncio.to_thread(input, f'{prompt} ') + + +def parse_decimal(number:float | int | str) -> decimal.Decimal: + """ + >>> parse_decimal(5) + Decimal('5') + >>> parse_decimal(5.5) + Decimal('5.5') + >>> parse_decimal("5.5") + Decimal('5.5') + >>> parse_decimal("5,5") + Decimal('5.5') + >>> parse_decimal("1.005,5") + Decimal('1005.5') + >>> parse_decimal("1,005.5") + Decimal('1005.5') + """ + try: + return decimal.Decimal(number) + except decimal.InvalidOperation as ex: + parts = re.split("[.,]", str(number)) + try: + return decimal.Decimal("".join(parts[:-1]) + "." + parts[-1]) + except decimal.InvalidOperation: + raise decimal.DecimalException(f"Invalid number format: {number}") from ex + + +def parse_datetime(date:datetime | str | None) -> datetime | None: + """ + >>> parse_datetime(datetime(2020, 1, 1, 0, 0)) + datetime.datetime(2020, 1, 1, 0, 0) + >>> parse_datetime("2020-01-01T00:00:00") + datetime.datetime(2020, 1, 1, 0, 0) + >>> parse_datetime(None) + + """ + if date is None: + return None + if isinstance(date, datetime): + return date + return datetime.fromisoformat(date) diff --git a/src/kleinanzeigen_bot/utils/net.py b/src/kleinanzeigen_bot/utils/net.py new file mode 100644 index 0000000..9a9435d --- /dev/null +++ b/src/kleinanzeigen_bot/utils/net.py @@ -0,0 +1,20 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import socket + + +def is_port_open(host:str, port:int) -> bool: + s:socket.socket | None = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(1) + s.connect((host, port)) + return True + except Exception: + return False + finally: + if s: + s.close() diff --git a/src/kleinanzeigen_bot/utils/reflect.py b/src/kleinanzeigen_bot/utils/reflect.py new file mode 100644 index 0000000..694af77 --- /dev/null +++ b/src/kleinanzeigen_bot/utils/reflect.py @@ -0,0 +1,26 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import inspect +from typing import Any + + +def get_caller(depth: int = 1) -> inspect.FrameInfo | None: + stack = inspect.stack() + try: + for frame in stack[depth + 1:]: + if frame.function and frame.function != "": + return frame + return None + finally: + del stack # Clean up the stack to avoid reference cycles + + +def is_integer(obj:Any) -> bool: + try: + int(obj) + return True + except (ValueError, TypeError): + return False diff --git a/src/kleinanzeigen_bot/web_scraping_mixin.py b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py similarity index 98% rename from src/kleinanzeigen_bot/web_scraping_mixin.py rename to src/kleinanzeigen_bot/utils/web_scraping_mixin.py index abaabda..08f5341 100644 --- a/src/kleinanzeigen_bot/web_scraping_mixin.py +++ b/src/kleinanzeigen_bot/utils/web_scraping_mixin.py @@ -3,7 +3,7 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ -import asyncio, enum, inspect, json, logging, os, platform, secrets, shutil, time +import asyncio, enum, inspect, json, os, platform, secrets, shutil, time from collections.abc import Callable, Coroutine, Iterable from gettext import gettext as _ from typing import cast, Any, Final @@ -19,8 +19,8 @@ from nodriver.core.config import Config from nodriver.core.element import Element from nodriver.core.tab import Tab as Page -from .i18n import get_translating_logger -from .utils import ensure, is_port_open, T +from . import loggers, net +from .misc import ensure, T __all__ = [ "Browser", @@ -32,7 +32,7 @@ __all__ = [ "WebScrapingMixin", ] -LOG:Final[logging.Logger] = get_translating_logger(__name__) +LOG:Final[loggers.Logger] = loggers.get_logger(__name__) # see https://api.jquery.com/category/selectors/ METACHAR_ESCAPER:Final[dict[int, str]] = str.maketrans({ch: f'\\{ch}' for ch in '!"#$%&\'()*+,./:;<=>?@[\\]^`{|}~'}) @@ -95,7 +95,7 @@ class WebScrapingMixin: if remote_port > 0: LOG.info("Using existing browser process at %s:%s", remote_host, remote_port) - ensure(is_port_open(remote_host, remote_port), + ensure(net.is_port_open(remote_host, remote_port), f"Browser process not reachable at {remote_host}:{remote_port}. " + f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml") cfg = Config( @@ -146,7 +146,7 @@ class WebScrapingMixin: LOG.info(" -> Custom Browser argument: %s", browser_arg) browser_args.append(browser_arg) - if not LOG.isEnabledFor(logging.DEBUG): + if not loggers.is_debug(LOG): browser_args.append("--log-level=3") # INFO: 0, WARNING: 1, ERROR: 2, FATAL: 3 if self.browser_config.user_data_dir: @@ -483,7 +483,7 @@ class WebScrapingMixin: async def web_sleep(self, min_ms:int = 1000, max_ms:int = 2500) -> None: duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms - LOG.log(logging.INFO if duration > 1500 else logging.DEBUG, " ... pausing for %d ms ...", duration) + LOG.log(loggers.INFO if duration > 1500 else loggers.DEBUG, " ... pausing for %d ms ...", duration) await self.page.sleep(duration / 1000) async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200, diff --git a/tests/conftest.py b/tests/conftest.py index d19d1b0..861d32b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,21 +3,21 @@ SPDX-FileCopyrightText: © Jens Bergmann and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ -import logging, os +import os from typing import Any, Final from unittest.mock import MagicMock import pytest -from kleinanzeigen_bot import KleinanzeigenBot, utils +from kleinanzeigen_bot import KleinanzeigenBot +from kleinanzeigen_bot.utils import loggers from kleinanzeigen_bot.extract import AdExtractor -from kleinanzeigen_bot.i18n import get_translating_logger -from kleinanzeigen_bot.web_scraping_mixin import Browser +from kleinanzeigen_bot.utils.web_scraping_mixin import Browser -utils.configure_console_logging() +loggers.configure_console_logging() -LOG: Final[logging.Logger] = get_translating_logger("kleinanzeigen_bot") -LOG.setLevel(logging.DEBUG) +LOG:Final[loggers.Logger] = loggers.get_logger("kleinanzeigen_bot") +LOG.setLevel(loggers.DEBUG) @pytest.fixture @@ -85,7 +85,7 @@ def browser_mock() -> MagicMock: This mock is configured with the Browser spec to ensure it has all the required methods and attributes of a real Browser instance. """ - return MagicMock(spec=Browser) + return MagicMock(spec = Browser) @pytest.fixture diff --git a/tests/integration/test_web_scraping_mixin.py b/tests/integration/test_web_scraping_mixin.py index 809ed26..d0ae00c 100644 --- a/tests/integration/test_web_scraping_mixin.py +++ b/tests/integration/test_web_scraping_mixin.py @@ -3,18 +3,18 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ -import logging, os, platform +import os, platform from typing import cast import nodriver, pytest -from kleinanzeigen_bot.utils import ensure -from kleinanzeigen_bot.i18n import get_translating_logger -from kleinanzeigen_bot.web_scraping_mixin import WebScrapingMixin +from kleinanzeigen_bot.utils import loggers +from kleinanzeigen_bot.utils.misc import ensure +from kleinanzeigen_bot.utils.web_scraping_mixin import WebScrapingMixin if os.environ.get("CI"): - get_translating_logger("kleinanzeigen_bot").setLevel(logging.DEBUG) - get_translating_logger("nodriver").setLevel(logging.DEBUG) + loggers.get_logger("kleinanzeigen_bot").setLevel(loggers.DEBUG) + loggers.get_logger("nodriver").setLevel(loggers.DEBUG) async def atest_init() -> None: diff --git a/tests/unit/test_utils.py b/tests/unit/test_ads.py similarity index 60% rename from tests/unit/test_utils.py rename to tests/unit/test_ads.py index c63ceaa..f5740c9 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_ads.py @@ -3,31 +3,7 @@ SPDX-FileCopyrightText: © Sebastian Thomschke and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ -import pytest -from kleinanzeigen_bot import utils - - -def test_ensure() -> None: - utils.ensure(True, "TRUE") - utils.ensure("Some Value", "TRUE") - utils.ensure(123, "TRUE") - utils.ensure(-123, "TRUE") - utils.ensure(lambda: True, "TRUE") - - with pytest.raises(AssertionError): - utils.ensure(False, "FALSE") - - with pytest.raises(AssertionError): - utils.ensure(0, "FALSE") - - with pytest.raises(AssertionError): - utils.ensure("", "FALSE") - - with pytest.raises(AssertionError): - utils.ensure(None, "FALSE") - - with pytest.raises(AssertionError): - utils.ensure(lambda: False, "FALSE", timeout = 2) +from kleinanzeigen_bot import ads def test_calculate_content_hash_with_none_values() -> None: @@ -48,6 +24,6 @@ def test_calculate_content_hash_with_none_values() -> None: } # Should not raise TypeError - hash_value = utils.calculate_content_hash(ad_cfg) + hash_value = ads.calculate_content_hash(ad_cfg) assert isinstance(hash_value, str) assert len(hash_value) == 64 # SHA-256 hash is 64 characters long diff --git a/tests/unit/test_extract.py b/tests/unit/test_extract.py index 45af61b..f898461 100644 --- a/tests/unit/test_extract.py +++ b/tests/unit/test_extract.py @@ -1,5 +1,5 @@ """ -SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-FileCopyrightText: © Jens Bergmann and contributors SPDX-License-Identifier: AGPL-3.0-or-later SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ """ @@ -10,7 +10,7 @@ from unittest.mock import AsyncMock, MagicMock, call, patch import pytest from kleinanzeigen_bot.extract import AdExtractor -from kleinanzeigen_bot.web_scraping_mixin import Browser, By, Element +from kleinanzeigen_bot.utils.web_scraping_mixin import Browser, By, Element class _DimensionsDict(TypedDict): @@ -25,7 +25,7 @@ class _BelenConfDict(TypedDict): universalAnalyticsOpts: _UniversalAnalyticsOptsDict -class _SpecialAttributesDict(TypedDict, total=False): +class _SpecialAttributesDict(TypedDict, total = False): art_s: str condition_s: str @@ -77,7 +77,7 @@ class TestAdExtractorPricing: self, test_extractor: AdExtractor, price_text: str, expected_price: int | None, expected_type: str ) -> None: """Test price extraction with different formats""" - with patch.object(test_extractor, 'web_text', new_callable=AsyncMock, return_value=price_text): + with patch.object(test_extractor, 'web_text', new_callable = AsyncMock, return_value = price_text): price, price_type = await test_extractor._extract_pricing_info_from_ad_page() assert price == expected_price assert price_type == expected_type @@ -86,7 +86,7 @@ class TestAdExtractorPricing: # pylint: disable=protected-access async def test_extract_pricing_info_timeout(self, test_extractor: AdExtractor) -> None: """Test price extraction when element is not found""" - with patch.object(test_extractor, 'web_text', new_callable=AsyncMock, side_effect=TimeoutError): + with patch.object(test_extractor, 'web_text', new_callable = AsyncMock, side_effect = TimeoutError): price, price_type = await test_extractor._extract_pricing_info_from_ad_page() assert price is None assert price_type == "NOT_APPLICABLE" @@ -110,8 +110,8 @@ class TestAdExtractorShipping: ) -> None: """Test shipping info extraction with different text formats.""" with patch.object(test_extractor, 'page', MagicMock()), \ - patch.object(test_extractor, 'web_text', new_callable=AsyncMock, return_value=shipping_text), \ - patch.object(test_extractor, 'web_request', new_callable=AsyncMock) as mock_web_request: + patch.object(test_extractor, 'web_text', new_callable = AsyncMock, return_value = shipping_text), \ + patch.object(test_extractor, 'web_request', new_callable = AsyncMock) as mock_web_request: if expected_cost: shipping_response: dict[str, Any] = { @@ -151,8 +151,8 @@ class TestAdExtractorShipping: } with patch.object(test_extractor, 'page', MagicMock()), \ - patch.object(test_extractor, 'web_text', new_callable=AsyncMock, return_value="+ Versand ab 5,49 €"), \ - patch.object(test_extractor, 'web_request', new_callable=AsyncMock, return_value=shipping_response): + patch.object(test_extractor, 'web_text', new_callable = AsyncMock, return_value = "+ Versand ab 5,49 €"), \ + patch.object(test_extractor, 'web_request', new_callable = AsyncMock, return_value = shipping_response): shipping_type, costs, options = await test_extractor._extract_shipping_info_from_ad_page() @@ -171,8 +171,8 @@ class TestAdExtractorNavigation: page_mock.url = "https://www.kleinanzeigen.de/s-anzeige/test/12345" with patch.object(test_extractor, 'page', page_mock), \ - patch.object(test_extractor, 'web_open', new_callable=AsyncMock) as mock_web_open, \ - patch.object(test_extractor, 'web_find', new_callable=AsyncMock, side_effect=TimeoutError): + patch.object(test_extractor, 'web_open', new_callable = AsyncMock) as mock_web_open, \ + patch.object(test_extractor, 'web_find', new_callable = AsyncMock, side_effect = TimeoutError): result = await test_extractor.naviagte_to_ad_page("https://www.kleinanzeigen.de/s-anzeige/test/12345") assert result is True @@ -186,16 +186,16 @@ class TestAdExtractorNavigation: submit_button_mock = AsyncMock() submit_button_mock.click = AsyncMock() - submit_button_mock.apply = AsyncMock(return_value=True) + submit_button_mock.apply = AsyncMock(return_value = True) input_mock = AsyncMock() input_mock.clear_input = AsyncMock() input_mock.send_keys = AsyncMock() - input_mock.apply = AsyncMock(return_value=True) + input_mock.apply = AsyncMock(return_value = True) popup_close_mock = AsyncMock() popup_close_mock.click = AsyncMock() - popup_close_mock.apply = AsyncMock(return_value=True) + popup_close_mock.apply = AsyncMock(return_value = True) def find_mock(selector_type: By, selector_value: str, **_: Any) -> Element | None: if selector_type == By.ID and selector_value == "site-search-query": @@ -207,10 +207,10 @@ class TestAdExtractorNavigation: return None with patch.object(test_extractor, 'page', page_mock), \ - patch.object(test_extractor, 'web_open', new_callable=AsyncMock) as mock_web_open, \ - patch.object(test_extractor, 'web_input', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_check', new_callable=AsyncMock, return_value=True), \ - patch.object(test_extractor, 'web_find', new_callable=AsyncMock, side_effect=find_mock): + patch.object(test_extractor, 'web_open', new_callable = AsyncMock) as mock_web_open, \ + patch.object(test_extractor, 'web_input', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_check', new_callable = AsyncMock, return_value = True), \ + patch.object(test_extractor, 'web_find', new_callable = AsyncMock, side_effect = find_mock): result = await test_extractor.naviagte_to_ad_page(12345) assert result is True @@ -227,13 +227,13 @@ class TestAdExtractorNavigation: input_mock = AsyncMock() input_mock.clear_input = AsyncMock() input_mock.send_keys = AsyncMock() - input_mock.apply = AsyncMock(return_value=True) + input_mock.apply = AsyncMock(return_value = True) with patch.object(test_extractor, 'page', page_mock), \ - patch.object(test_extractor, 'web_open', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_find', new_callable=AsyncMock, return_value=input_mock), \ - patch.object(test_extractor, 'web_click', new_callable=AsyncMock) as mock_web_click, \ - patch.object(test_extractor, 'web_check', new_callable=AsyncMock, return_value=True): + patch.object(test_extractor, 'web_open', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_find', new_callable = AsyncMock, return_value = input_mock), \ + patch.object(test_extractor, 'web_click', new_callable = AsyncMock) as mock_web_click, \ + patch.object(test_extractor, 'web_check', new_callable = AsyncMock, return_value = True): result = await test_extractor.naviagte_to_ad_page(12345) assert result is True @@ -248,12 +248,12 @@ class TestAdExtractorNavigation: input_mock = AsyncMock() input_mock.clear_input = AsyncMock() input_mock.send_keys = AsyncMock() - input_mock.apply = AsyncMock(return_value=True) + input_mock.apply = AsyncMock(return_value = True) input_mock.attrs = {} with patch.object(test_extractor, 'page', page_mock), \ - patch.object(test_extractor, 'web_open', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_find', new_callable=AsyncMock, return_value=input_mock): + patch.object(test_extractor, 'web_open', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_find', new_callable = AsyncMock, return_value = input_mock): result = await test_extractor.naviagte_to_ad_page(99999) assert result is False @@ -261,12 +261,12 @@ class TestAdExtractorNavigation: @pytest.mark.asyncio async def test_extract_own_ads_urls(self, test_extractor: AdExtractor) -> None: """Test extraction of own ads URLs - basic test.""" - with patch.object(test_extractor, 'web_open', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_sleep', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_find', new_callable=AsyncMock) as mock_web_find, \ - patch.object(test_extractor, 'web_find_all', new_callable=AsyncMock) as mock_web_find_all, \ - patch.object(test_extractor, 'web_scroll_page_down', new_callable=AsyncMock), \ - patch.object(test_extractor, 'web_execute', new_callable=AsyncMock): + with patch.object(test_extractor, 'web_open', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_sleep', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_find', new_callable = AsyncMock) as mock_web_find, \ + patch.object(test_extractor, 'web_find_all', new_callable = AsyncMock) as mock_web_find_all, \ + patch.object(test_extractor, 'web_scroll_page_down', new_callable = AsyncMock), \ + patch.object(test_extractor, 'web_execute', new_callable = AsyncMock): # Setup mock objects for DOM elements splitpage = MagicMock() @@ -280,18 +280,18 @@ class TestAdExtractorNavigation: # Setup mock responses for web_find mock_web_find.side_effect = [ - splitpage, # .l-splitpage - pagination_section, # section:nth-of-type(4) - pagination, # div > div:nth-of-type(2) > div:nth-of-type(2) > div - pagination_div, # div:nth-of-type(1) - ad_list, # my-manageitems-adlist - link # article > section > section:nth-of-type(2) > h2 > div > a + splitpage, # .l-splitpage + pagination_section, # section:nth-of-type(4) + pagination, # div > div:nth-of-type(2) > div:nth-of-type(2) > div + pagination_div, # div:nth-of-type(1) + ad_list, # my-manageitems-adlist + link # article > section > section:nth-of-type(2) > h2 > div > a ] # Setup mock responses for web_find_all mock_web_find_all.side_effect = [ - [MagicMock()], # buttons in pagination - [cardbox] # cardbox elements + [MagicMock()], # buttons in pagination + [cardbox] # cardbox elements ] # Execute test and verify results @@ -304,7 +304,7 @@ class TestAdExtractorContent: @pytest.fixture def extractor(self) -> AdExtractor: - browser_mock = MagicMock(spec=Browser) + browser_mock = MagicMock(spec = Browser) config_mock = { "ad_defaults": { "description": { @@ -326,15 +326,15 @@ class TestAdExtractorContent: category_mock.attrs = {'href': '/s-kategorie/c123'} with patch.object(extractor, 'page', page_mock), \ - patch.object(extractor, 'web_text', new_callable=AsyncMock) as mock_web_text, \ - patch.object(extractor, 'web_find', new_callable=AsyncMock, return_value=category_mock), \ - patch.object(extractor, '_extract_category_from_ad_page', new_callable=AsyncMock, return_value="17/23"), \ - patch.object(extractor, '_extract_special_attributes_from_ad_page', new_callable=AsyncMock, return_value={}), \ - patch.object(extractor, '_extract_pricing_info_from_ad_page', new_callable=AsyncMock, return_value=(None, "NOT_APPLICABLE")), \ - patch.object(extractor, '_extract_shipping_info_from_ad_page', new_callable=AsyncMock, return_value=("NOT_APPLICABLE", None, None)), \ - patch.object(extractor, '_extract_sell_directly_from_ad_page', new_callable=AsyncMock, return_value=False), \ - patch.object(extractor, '_download_images_from_ad_page', new_callable=AsyncMock, return_value=[]), \ - patch.object(extractor, '_extract_contact_from_ad_page', new_callable=AsyncMock, return_value={}): + patch.object(extractor, 'web_text', new_callable = AsyncMock) as mock_web_text, \ + patch.object(extractor, 'web_find', new_callable = AsyncMock, return_value = category_mock), \ + patch.object(extractor, '_extract_category_from_ad_page', new_callable = AsyncMock, return_value = "17/23"), \ + patch.object(extractor, '_extract_special_attributes_from_ad_page', new_callable = AsyncMock, return_value = {}), \ + patch.object(extractor, '_extract_pricing_info_from_ad_page', new_callable = AsyncMock, return_value = (None, "NOT_APPLICABLE")), \ + patch.object(extractor, '_extract_shipping_info_from_ad_page', new_callable = AsyncMock, return_value = ("NOT_APPLICABLE", None, None)), \ + patch.object(extractor, '_extract_sell_directly_from_ad_page', new_callable = AsyncMock, return_value = False), \ + patch.object(extractor, '_download_images_from_ad_page', new_callable = AsyncMock, return_value = []), \ + patch.object(extractor, '_extract_contact_from_ad_page', new_callable = AsyncMock, return_value = {}): mock_web_text.side_effect = [ "Test Title", @@ -358,11 +358,11 @@ class TestAdExtractorContent: ] for text, expected in test_cases: - with patch.object(extractor, 'web_text', new_callable=AsyncMock, return_value=text): + with patch.object(extractor, 'web_text', new_callable = AsyncMock, return_value = text): result = await extractor._extract_sell_directly_from_ad_page() assert result is expected - with patch.object(extractor, 'web_text', new_callable=AsyncMock, side_effect=TimeoutError): + with patch.object(extractor, 'web_text', new_callable = AsyncMock, side_effect = TimeoutError): result = await extractor._extract_sell_directly_from_ad_page() assert result is None @@ -372,7 +372,7 @@ class TestAdExtractorCategory: @pytest.fixture def extractor(self) -> AdExtractor: - browser_mock = MagicMock(spec=Browser) + browser_mock = MagicMock(spec = Browser) config_mock = { "ad_defaults": { "description": { @@ -393,7 +393,7 @@ class TestAdExtractorCategory: second_part = MagicMock() second_part.attrs = {'href': '/s-spielzeug/c23'} - with patch.object(extractor, 'web_find', new_callable=AsyncMock) as mock_web_find: + with patch.object(extractor, 'web_find', new_callable = AsyncMock) as mock_web_find: mock_web_find.side_effect = [ category_line, first_part, @@ -404,14 +404,14 @@ class TestAdExtractorCategory: assert result == "17/23" mock_web_find.assert_any_call(By.ID, 'vap-brdcrmb') - mock_web_find.assert_any_call(By.CSS_SELECTOR, 'a:nth-of-type(2)', parent=category_line) - mock_web_find.assert_any_call(By.CSS_SELECTOR, 'a:nth-of-type(3)', parent=category_line) + mock_web_find.assert_any_call(By.CSS_SELECTOR, 'a:nth-of-type(2)', parent = category_line) + mock_web_find.assert_any_call(By.CSS_SELECTOR, 'a:nth-of-type(3)', parent = category_line) @pytest.mark.asyncio # pylint: disable=protected-access async def test_extract_special_attributes_empty(self, extractor: AdExtractor) -> None: """Test extraction of special attributes when empty.""" - with patch.object(extractor, 'web_execute', new_callable=AsyncMock) as mock_web_execute: + with patch.object(extractor, 'web_execute', new_callable = AsyncMock) as mock_web_execute: mock_web_execute.return_value = { "universalAnalyticsOpts": { "dimensions": { @@ -428,7 +428,7 @@ class TestAdExtractorContact: @pytest.fixture def extractor(self) -> AdExtractor: - browser_mock = MagicMock(spec=Browser) + browser_mock = MagicMock(spec = Browser) config_mock = { "ad_defaults": { "description": { @@ -444,8 +444,8 @@ class TestAdExtractorContact: async def test_extract_contact_info(self, extractor: AdExtractor) -> None: """Test extraction of contact information.""" with patch.object(extractor, 'page', MagicMock()), \ - patch.object(extractor, 'web_text', new_callable=AsyncMock) as mock_web_text, \ - patch.object(extractor, 'web_find', new_callable=AsyncMock) as mock_web_find: + patch.object(extractor, 'web_text', new_callable = AsyncMock) as mock_web_text, \ + patch.object(extractor, 'web_find', new_callable = AsyncMock) as mock_web_find: mock_web_text.side_effect = [ "12345 Berlin - Mitte", @@ -472,8 +472,8 @@ class TestAdExtractorContact: async def test_extract_contact_info_timeout(self, extractor: AdExtractor) -> None: """Test contact info extraction when elements are not found.""" with patch.object(extractor, 'page', MagicMock()), \ - patch.object(extractor, 'web_text', new_callable=AsyncMock, side_effect=TimeoutError()), \ - patch.object(extractor, 'web_find', new_callable=AsyncMock, side_effect=TimeoutError()): + patch.object(extractor, 'web_text', new_callable = AsyncMock, side_effect = TimeoutError()), \ + patch.object(extractor, 'web_find', new_callable = AsyncMock, side_effect = TimeoutError()): with pytest.raises(TimeoutError): await extractor._extract_contact_from_ad_page() @@ -483,8 +483,8 @@ class TestAdExtractorContact: async def test_extract_contact_info_with_phone(self, extractor: AdExtractor) -> None: """Test extraction of contact information including phone number.""" with patch.object(extractor, 'page', MagicMock()), \ - patch.object(extractor, 'web_text', new_callable=AsyncMock) as mock_web_text, \ - patch.object(extractor, 'web_find', new_callable=AsyncMock) as mock_web_find: + patch.object(extractor, 'web_text', new_callable = AsyncMock) as mock_web_text, \ + patch.object(extractor, 'web_find', new_callable = AsyncMock) as mock_web_find: mock_web_text.side_effect = [ "12345 Berlin - Mitte", @@ -510,7 +510,7 @@ class TestAdExtractorDownload: @pytest.fixture def extractor(self) -> AdExtractor: - browser_mock = MagicMock(spec=Browser) + browser_mock = MagicMock(spec = Browser) config_mock = { "ad_defaults": { "description": { @@ -529,8 +529,8 @@ class TestAdExtractorDownload: patch('os.makedirs') as mock_makedirs, \ patch('os.mkdir') as mock_mkdir, \ patch('shutil.rmtree') as mock_rmtree, \ - patch('kleinanzeigen_bot.extract.save_dict', autospec=True) as mock_save_dict, \ - patch.object(extractor, '_extract_ad_page_info', new_callable=AsyncMock) as mock_extract: + patch('kleinanzeigen_bot.extract.dicts.save_dict', autospec = True) as mock_save_dict, \ + patch.object(extractor, '_extract_ad_page_info', new_callable = AsyncMock) as mock_extract: base_dir = 'downloaded-ads' ad_dir = os.path.join(base_dir, 'ad_12345') @@ -574,7 +574,7 @@ class TestAdExtractorDownload: # pylint: disable=protected-access async def test_download_images_no_images(self, extractor: AdExtractor) -> None: """Test image download when no images are found.""" - with patch.object(extractor, 'web_find', new_callable=AsyncMock, side_effect=TimeoutError): + with patch.object(extractor, 'web_find', new_callable = AsyncMock, side_effect = TimeoutError): image_paths = await extractor._download_images_from_ad_page("/some/dir", 12345) assert len(image_paths) == 0 @@ -586,8 +586,8 @@ class TestAdExtractorDownload: patch('os.makedirs') as mock_makedirs, \ patch('os.mkdir') as mock_mkdir, \ patch('shutil.rmtree') as mock_rmtree, \ - patch('kleinanzeigen_bot.extract.save_dict', autospec=True) as mock_save_dict, \ - patch.object(extractor, '_extract_ad_page_info', new_callable=AsyncMock) as mock_extract: + patch('kleinanzeigen_bot.extract.dicts.save_dict', autospec = True) as mock_save_dict, \ + patch.object(extractor, '_extract_ad_page_info', new_callable = AsyncMock) as mock_extract: base_dir = 'downloaded-ads' ad_dir = os.path.join(base_dir, 'ad_12345') diff --git a/tests/unit/test_i18n.py b/tests/unit/test_i18n.py index 7f4e041..11f86f1 100644 --- a/tests/unit/test_i18n.py +++ b/tests/unit/test_i18n.py @@ -5,7 +5,7 @@ SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanze """ import pytest from _pytest.monkeypatch import MonkeyPatch # pylint: disable=import-private-name -from kleinanzeigen_bot import i18n +from kleinanzeigen_bot.utils import i18n @pytest.mark.parametrize("lang, expected", [ diff --git a/tests/unit/test_init.py b/tests/unit/test_init.py index daf5cae..93e4896 100644 --- a/tests/unit/test_init.py +++ b/tests/unit/test_init.py @@ -15,7 +15,7 @@ from ruamel.yaml import YAML from kleinanzeigen_bot import LOG, KleinanzeigenBot from kleinanzeigen_bot._version import __version__ -from kleinanzeigen_bot.utils import calculate_content_hash +from kleinanzeigen_bot.ads import calculate_content_hash @pytest.fixture @@ -31,7 +31,7 @@ def mock_page() -> MagicMock: mock.wait_for_selector = AsyncMock() mock.wait_for_navigation = AsyncMock() mock.wait_for_load_state = AsyncMock() - mock.content = AsyncMock(return_value="") + mock.content = AsyncMock(return_value = "") mock.goto = AsyncMock() mock.close = AsyncMock() return mock @@ -132,9 +132,9 @@ def mock_config_setup(test_bot: KleinanzeigenBot) -> Generator[None]: """Provide a centralized mock configuration setup for tests. This fixture mocks load_config and other essential configuration-related methods.""" with patch.object(test_bot, 'load_config'), \ - patch.object(test_bot, 'create_browser_session', new_callable=AsyncMock), \ - patch.object(test_bot, 'login', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_request', new_callable=AsyncMock) as mock_request: + patch.object(test_bot, 'create_browser_session', new_callable = AsyncMock), \ + patch.object(test_bot, 'login', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_request', new_callable = AsyncMock) as mock_request: # Mock the web request for published ads mock_request.return_value = {"content": '{"ads": []}'} yield @@ -250,15 +250,15 @@ class TestKleinanzeigenBotConfiguration: sample_config_with_categories = sample_config.copy() sample_config_with_categories["categories"] = {} - with patch('kleinanzeigen_bot.utils.load_dict_if_exists', return_value=None), \ + with patch('kleinanzeigen_bot.utils.dicts.load_dict_if_exists', return_value = None), \ patch.object(LOG, 'warning') as mock_warning, \ - patch('kleinanzeigen_bot.utils.save_dict') as mock_save, \ - patch('kleinanzeigen_bot.utils.load_dict_from_module') as mock_load_module: + patch('kleinanzeigen_bot.utils.dicts.save_dict') as mock_save, \ + patch('kleinanzeigen_bot.utils.dicts.load_dict_from_module') as mock_load_module: mock_load_module.side_effect = [ sample_config_with_categories, # config_defaults.yaml - {'cat1': 'id1'}, # categories.yaml - {'cat2': 'id2'} # categories_old.yaml + {'cat1': 'id1'}, # categories.yaml + {'cat2': 'id2'} # categories_old.yaml ] test_bot.load_config() @@ -279,7 +279,7 @@ login: browser: arguments: [] """ - with open(config_path, "w", encoding="utf-8") as f: + with open(config_path, "w", encoding = "utf-8") as f: f.write(config_content) test_bot.config_file_path = str(config_path) @@ -300,13 +300,13 @@ class TestKleinanzeigenBotAuthentication: @pytest.mark.asyncio async def test_assert_free_ad_limit_not_reached_success(self, configured_bot: KleinanzeigenBot) -> None: """Verify that free ad limit check succeeds when limit not reached.""" - with patch.object(configured_bot, 'web_find', side_effect=TimeoutError): + with patch.object(configured_bot, 'web_find', side_effect = TimeoutError): await configured_bot.assert_free_ad_limit_not_reached() @pytest.mark.asyncio async def test_assert_free_ad_limit_not_reached_limit_reached(self, configured_bot: KleinanzeigenBot) -> None: """Verify that free ad limit check fails when limit is reached.""" - with patch.object(configured_bot, 'web_find', return_value=AsyncMock()): + with patch.object(configured_bot, 'web_find', return_value = AsyncMock()): with pytest.raises(AssertionError) as exc_info: await configured_bot.assert_free_ad_limit_not_reached() assert "Cannot publish more ads" in str(exc_info.value) @@ -314,21 +314,21 @@ class TestKleinanzeigenBotAuthentication: @pytest.mark.asyncio async def test_is_logged_in_returns_true_when_logged_in(self, configured_bot: KleinanzeigenBot) -> None: """Verify that login check returns true when logged in.""" - with patch.object(configured_bot, 'web_text', return_value='Welcome testuser'): + with patch.object(configured_bot, 'web_text', return_value = 'Welcome testuser'): assert await configured_bot.is_logged_in() is True @pytest.mark.asyncio async def test_is_logged_in_returns_false_when_not_logged_in(self, configured_bot: KleinanzeigenBot) -> None: """Verify that login check returns false when not logged in.""" - with patch.object(configured_bot, 'web_text', side_effect=TimeoutError): + with patch.object(configured_bot, 'web_text', side_effect = TimeoutError): assert await configured_bot.is_logged_in() is False @pytest.mark.asyncio async def test_login_flow_completes_successfully(self, configured_bot: KleinanzeigenBot) -> None: """Verify that normal login flow completes successfully.""" with patch.object(configured_bot, 'web_open') as mock_open, \ - patch.object(configured_bot, 'is_logged_in', side_effect=[False, True]) as mock_logged_in, \ - patch.object(configured_bot, 'web_find', side_effect=TimeoutError), \ + patch.object(configured_bot, 'is_logged_in', side_effect = [False, True]) as mock_logged_in, \ + patch.object(configured_bot, 'web_find', side_effect = TimeoutError), \ patch.object(configured_bot, 'web_input') as mock_input, \ patch.object(configured_bot, 'web_click') as mock_click: @@ -343,7 +343,7 @@ class TestKleinanzeigenBotAuthentication: async def test_login_flow_handles_captcha(self, configured_bot: KleinanzeigenBot) -> None: """Verify that login flow handles captcha correctly.""" with patch.object(configured_bot, 'web_open'), \ - patch.object(configured_bot, 'is_logged_in', return_value=False), \ + patch.object(configured_bot, 'is_logged_in', return_value = False), \ patch.object(configured_bot, 'web_find') as mock_find, \ patch.object(configured_bot, 'web_await') as mock_await, \ patch.object(configured_bot, 'web_input'), \ @@ -351,11 +351,11 @@ class TestKleinanzeigenBotAuthentication: patch('kleinanzeigen_bot.ainput') as mock_ainput: mock_find.side_effect = [ - AsyncMock(), # Captcha iframe - TimeoutError(), # Login form - TimeoutError(), # Phone verification - TimeoutError(), # GDPR banner - TimeoutError(), # GDPR banner click + AsyncMock(), # Captcha iframe + TimeoutError(), # Login form + TimeoutError(), # Phone verification + TimeoutError(), # GDPR banner + TimeoutError(), # GDPR banner click ] mock_await.return_value = True mock_ainput.return_value = "" @@ -414,7 +414,7 @@ class TestKleinanzeigenBotBasics: """Test closing browser session.""" mock_close = MagicMock() test_bot.page = MagicMock() # Ensure page exists to trigger cleanup - with patch.object(test_bot, 'close_browser_session', new=mock_close): + with patch.object(test_bot, 'close_browser_session', new = mock_close): test_bot.close_browser_session() # Call directly instead of relying on __del__ mock_close.assert_called_once() @@ -554,7 +554,7 @@ class TestKleinanzeigenBotCommands: async def test_verify_command(self, test_bot: KleinanzeigenBot, tmp_path: Any) -> None: """Test verify command with minimal config.""" config_path = Path(tmp_path) / "config.yaml" - with open(config_path, "w", encoding="utf-8") as f: + with open(config_path, "w", encoding = "utf-8") as f: f.write(""" login: username: test @@ -571,21 +571,21 @@ class TestKleinanzeigenBotAdOperations: @pytest.mark.asyncio async def test_run_delete_command_no_ads(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test running delete command with no ads.""" - with patch.object(test_bot, 'load_ads', return_value=[]): + with patch.object(test_bot, 'load_ads', return_value = []): await test_bot.run(['script.py', 'delete']) assert test_bot.command == 'delete' @pytest.mark.asyncio async def test_run_publish_command_no_ads(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test running publish command with no ads.""" - with patch.object(test_bot, 'load_ads', return_value=[]): + with patch.object(test_bot, 'load_ads', return_value = []): await test_bot.run(['script.py', 'publish']) assert test_bot.command == 'publish' @pytest.mark.asyncio async def test_run_download_command_default_selector(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test running download command with default selector.""" - with patch.object(test_bot, 'download_ads', new_callable=AsyncMock): + with patch.object(test_bot, 'download_ads', new_callable = AsyncMock): await test_bot.run(['script.py', 'download']) assert test_bot.ads_selector == 'new' @@ -603,21 +603,21 @@ class TestKleinanzeigenBotAdManagement: async def test_download_ads_with_specific_ids(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test downloading ads with specific IDs.""" test_bot.ads_selector = '123,456' - with patch.object(test_bot, 'download_ads', new_callable=AsyncMock): + with patch.object(test_bot, 'download_ads', new_callable = AsyncMock): await test_bot.run(['script.py', 'download', '--ads=123,456']) assert test_bot.ads_selector == '123,456' @pytest.mark.asyncio async def test_run_publish_invalid_selector(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test running publish with invalid selector.""" - with patch.object(test_bot, 'load_ads', return_value=[]): + with patch.object(test_bot, 'load_ads', return_value = []): await test_bot.run(['script.py', 'publish', '--ads=invalid']) assert test_bot.ads_selector == 'due' @pytest.mark.asyncio async def test_run_download_invalid_selector(self, test_bot: KleinanzeigenBot, mock_config_setup: None) -> None: # pylint: disable=unused-argument """Test running download with invalid selector.""" - with patch.object(test_bot, 'download_ads', new_callable=AsyncMock): + with patch.object(test_bot, 'download_ads', new_callable = AsyncMock): await test_bot.run(['script.py', 'download', '--ads=invalid']) assert test_bot.ads_selector == 'new' @@ -628,7 +628,7 @@ class TestKleinanzeigenBotAdConfiguration: def test_load_config_with_categories(self, test_bot: KleinanzeigenBot, tmp_path: Any) -> None: """Test loading config with custom categories.""" config_path = Path(tmp_path) / "config.yaml" - with open(config_path, "w", encoding="utf-8") as f: + with open(config_path, "w", encoding = "utf-8") as f: f.write(""" login: username: test @@ -651,11 +651,11 @@ categories: # Create a minimal config with empty title to trigger validation ad_cfg = create_ad_config( minimal_ad_config, - title="" # Empty title to trigger length validation + title = "" # Empty title to trigger length validation ) yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -675,11 +675,11 @@ categories: # Create config with invalid price type ad_cfg = create_ad_config( minimal_ad_config, - price_type="INVALID_TYPE" # Invalid price type + price_type = "INVALID_TYPE" # Invalid price type ) yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -699,11 +699,11 @@ categories: # Create config with invalid shipping type ad_cfg = create_ad_config( minimal_ad_config, - shipping_type="INVALID_TYPE" # Invalid shipping type + shipping_type = "INVALID_TYPE" # Invalid shipping type ) yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -723,12 +723,12 @@ categories: # Create config with price for GIVE_AWAY type ad_cfg = create_ad_config( minimal_ad_config, - price_type="GIVE_AWAY", - price=100 # Price should not be set for GIVE_AWAY + price_type = "GIVE_AWAY", + price = 100 # Price should not be set for GIVE_AWAY ) yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -748,12 +748,12 @@ categories: # Create config with FIXED price type but no price ad_cfg = create_ad_config( minimal_ad_config, - price_type="FIXED", - price=None # Missing required price for FIXED type + price_type = "FIXED", + price = None # Missing required price for FIXED type ) yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -773,8 +773,8 @@ categories: # Create config with invalid category and empty description to prevent auto-detection ad_cfg = create_ad_config( minimal_ad_config, - category="999999", # Non-existent category - description=None # Set description to None to trigger validation + category = "999999", # Non-existent category + description = None # Set description to None to trigger validation ) # Mock the config to prevent auto-detection @@ -786,7 +786,7 @@ categories: } yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path to tmp_path and use relative path for ad_files @@ -804,14 +804,14 @@ class TestKleinanzeigenBotAdDeletion: async def test_delete_ad_by_title(self, test_bot: KleinanzeigenBot, minimal_ad_config: dict[str, Any]) -> None: """Test deleting an ad by title.""" test_bot.page = MagicMock() - test_bot.page.evaluate = AsyncMock(return_value={"statusCode": 200, "content": "{}"}) + test_bot.page.evaluate = AsyncMock(return_value = {"statusCode": 200, "content": "{}"}) test_bot.page.sleep = AsyncMock() # Use minimal config since we only need title for deletion by title ad_cfg = create_ad_config( minimal_ad_config, - title="Test Title", - id=None # Explicitly set id to None for title-based deletion + title = "Test Title", + id = None # Explicitly set id to None for title-based deletion ) published_ads = [ @@ -819,10 +819,10 @@ class TestKleinanzeigenBotAdDeletion: {"title": "Other Title", "id": "11111"} ] - with patch.object(test_bot, 'web_open', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_find', new_callable=AsyncMock) as mock_find, \ - patch.object(test_bot, 'web_click', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_check', new_callable=AsyncMock, return_value=True): + with patch.object(test_bot, 'web_open', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_find', new_callable = AsyncMock) as mock_find, \ + patch.object(test_bot, 'web_click', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_check', new_callable = AsyncMock, return_value = True): mock_find.return_value.attrs = {"content": "some-token"} result = await test_bot.delete_ad(ad_cfg, True, published_ads) assert result is True @@ -831,13 +831,13 @@ class TestKleinanzeigenBotAdDeletion: async def test_delete_ad_by_id(self, test_bot: KleinanzeigenBot, minimal_ad_config: dict[str, Any]) -> None: """Test deleting an ad by ID.""" test_bot.page = MagicMock() - test_bot.page.evaluate = AsyncMock(return_value={"statusCode": 200, "content": "{}"}) + test_bot.page.evaluate = AsyncMock(return_value = {"statusCode": 200, "content": "{}"}) test_bot.page.sleep = AsyncMock() # Create config with ID for deletion by ID ad_cfg = create_ad_config( minimal_ad_config, - id="12345" + id = "12345" ) published_ads = [ @@ -845,10 +845,10 @@ class TestKleinanzeigenBotAdDeletion: {"title": "Other Title", "id": "11111"} ] - with patch.object(test_bot, 'web_open', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_find', new_callable=AsyncMock) as mock_find, \ - patch.object(test_bot, 'web_click', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_check', new_callable=AsyncMock, return_value=True): + with patch.object(test_bot, 'web_open', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_find', new_callable = AsyncMock) as mock_find, \ + patch.object(test_bot, 'web_click', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_check', new_callable = AsyncMock, return_value = True): mock_find.return_value.attrs = {"content": "some-token"} result = await test_bot.delete_ad(ad_cfg, False, published_ads) assert result is True @@ -870,10 +870,10 @@ class TestKleinanzeigenBotAdRepublication: # Create ad config with all necessary fields for republication ad_cfg = create_ad_config( base_ad_config, - id="12345", - updated_on="2024-01-01T00:00:00", - created_on="2024-01-01T00:00:00", - description="Changed description" + id = "12345", + updated_on = "2024-01-01T00:00:00", + created_on = "2024-01-01T00:00:00", + description = "Changed description" ) # Create a temporary directory and file @@ -884,7 +884,7 @@ class TestKleinanzeigenBotAdRepublication: ad_file = ad_dir / "test_ad.yaml" yaml = YAML() - with open(ad_file, "w", encoding="utf-8") as f: + with open(ad_file, "w", encoding = "utf-8") as f: yaml.dump(ad_cfg, f) # Set config file path and use relative path for ad_files @@ -892,7 +892,7 @@ class TestKleinanzeigenBotAdRepublication: test_bot.config['ad_files'] = ["ads/*.yaml"] # Mock the loading of the original ad configuration - with patch('kleinanzeigen_bot.utils.load_dict', side_effect=[ + with patch('kleinanzeigen_bot.utils.dicts.load_dict', side_effect = [ ad_cfg, # First call returns the original ad config {} # Second call for ad_fields.yaml ]): @@ -902,14 +902,14 @@ class TestKleinanzeigenBotAdRepublication: def test_check_ad_republication_no_changes(self, test_bot: KleinanzeigenBot, base_ad_config: dict[str, Any]) -> None: """Test that unchanged ads within interval are not marked for republication.""" current_time = datetime.utcnow() - three_days_ago = (current_time - timedelta(days=3)).isoformat() + three_days_ago = (current_time - timedelta(days = 3)).isoformat() # Create ad config with timestamps for republication check ad_cfg = create_ad_config( base_ad_config, - id="12345", - updated_on=three_days_ago, - created_on=three_days_ago + id = "12345", + updated_on = three_days_ago, + created_on = three_days_ago ) # Calculate hash before making the copy to ensure they match @@ -919,8 +919,8 @@ class TestKleinanzeigenBotAdRepublication: # Mock the config to prevent actual file operations test_bot.config['ad_files'] = ['test.yaml'] - with patch('kleinanzeigen_bot.utils.load_dict_if_exists', return_value=ad_cfg_orig), \ - patch('kleinanzeigen_bot.utils.load_dict', return_value={}): # Mock ad_fields.yaml + with patch('kleinanzeigen_bot.utils.dicts.load_dict_if_exists', return_value = ad_cfg_orig), \ + patch('kleinanzeigen_bot.utils.dicts.load_dict', return_value = {}): # Mock ad_fields.yaml ads_to_publish = test_bot.load_ads() assert len(ads_to_publish) == 0 # No ads should be marked for republication @@ -939,9 +939,9 @@ class TestKleinanzeigenBotShippingOptions: # Create ad config with specific shipping options ad_cfg = create_ad_config( base_ad_config, - shipping_options=["DHL_2", "Hermes_Päckchen"], - created_on="2024-01-01T00:00:00", # Add created_on to prevent KeyError - updated_on="2024-01-01T00:00:00" # Add updated_on for consistency + shipping_options = ["DHL_2", "Hermes_Päckchen"], + created_on = "2024-01-01T00:00:00", # Add created_on to prevent KeyError + updated_on = "2024-01-01T00:00:00" # Add updated_on for consistency ) # Create the original ad config and published ads list @@ -959,26 +959,26 @@ class TestKleinanzeigenBotShippingOptions: ad_file = Path(tmp_path) / "test_ad.yaml" # Mock the necessary web interaction methods - with patch.object(test_bot, 'web_click', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_find', new_callable=AsyncMock) as mock_find, \ - patch.object(test_bot, 'web_select', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_input', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_open', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_sleep', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_check', new_callable=AsyncMock, return_value=True), \ - patch.object(test_bot, 'web_request', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_execute', new_callable=AsyncMock), \ - patch.object(test_bot, 'web_find_all', new_callable=AsyncMock) as mock_find_all, \ - patch.object(test_bot, 'web_await', new_callable=AsyncMock): + with patch.object(test_bot, 'web_click', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_find', new_callable = AsyncMock) as mock_find, \ + patch.object(test_bot, 'web_select', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_input', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_open', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_sleep', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_check', new_callable = AsyncMock, return_value = True), \ + patch.object(test_bot, 'web_request', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_execute', new_callable = AsyncMock), \ + patch.object(test_bot, 'web_find_all', new_callable = AsyncMock) as mock_find_all, \ + patch.object(test_bot, 'web_await', new_callable = AsyncMock): # Mock the shipping options form elements mock_find.side_effect = [ TimeoutError(), # First call in assert_free_ad_limit_not_reached - AsyncMock(attrs={"content": "csrf-token-123"}), # CSRF token - AsyncMock(attrs={"checked": True}), # Size radio button check - AsyncMock(attrs={"value": "Klein"}), # Size dropdown - AsyncMock(attrs={"value": "Paket 2 kg"}), # Package type dropdown - AsyncMock(attrs={"value": "Päckchen"}), # Second package type dropdown + AsyncMock(attrs = {"content": "csrf-token-123"}), # CSRF token + AsyncMock(attrs = {"checked": True}), # Size radio button check + AsyncMock(attrs = {"value": "Klein"}), # Size dropdown + AsyncMock(attrs = {"value": "Paket 2 kg"}), # Package type dropdown + AsyncMock(attrs = {"value": "Päckchen"}), # Second package type dropdown TimeoutError(), # Captcha check ] @@ -986,7 +986,7 @@ class TestKleinanzeigenBotShippingOptions: mock_find_all.return_value = [] # Mock web_check to return True for radio button checked state - with patch.object(test_bot, 'web_check', new_callable=AsyncMock) as mock_check: + with patch.object(test_bot, 'web_check', new_callable = AsyncMock) as mock_check: mock_check.return_value = True # Test through the public interface by publishing an ad diff --git a/tests/unit/test_utils_misc.py b/tests/unit/test_utils_misc.py new file mode 100644 index 0000000..96c15eb --- /dev/null +++ b/tests/unit/test_utils_misc.py @@ -0,0 +1,30 @@ +""" +SPDX-FileCopyrightText: © Sebastian Thomschke and contributors +SPDX-License-Identifier: AGPL-3.0-or-later +SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/ +""" +import pytest +from kleinanzeigen_bot.utils import misc + + +def test_ensure() -> None: + misc.ensure(True, "TRUE") + misc.ensure("Some Value", "TRUE") + misc.ensure(123, "TRUE") + misc.ensure(-123, "TRUE") + misc.ensure(lambda: True, "TRUE") + + with pytest.raises(AssertionError): + misc.ensure(False, "FALSE") + + with pytest.raises(AssertionError): + misc.ensure(0, "FALSE") + + with pytest.raises(AssertionError): + misc.ensure("", "FALSE") + + with pytest.raises(AssertionError): + misc.ensure(None, "FALSE") + + with pytest.raises(AssertionError): + misc.ensure(lambda: False, "FALSE", timeout = 2)