refact: use ruff instead of autopep8,bandit,pylint for linting

This commit is contained in:
sebthom
2025-04-28 12:51:51 +02:00
parent f0b84ab335
commit 376ec76226
27 changed files with 437 additions and 605 deletions

View File

@@ -1,30 +1,26 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import asyncio, atexit, copy, importlib.metadata, json, os, re, signal, shutil, sys, textwrap, time
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import atexit, copy, json, os, re, signal, sys, textwrap # isort: skip
import getopt # pylint: disable=deprecated-module
import urllib.parse as urllib_parse
import urllib.request as urllib_request
from collections.abc import Iterable
from datetime import datetime
from gettext import gettext as _
from typing import Any, Final
import certifi, colorama, nodriver
import certifi, colorama, nodriver # isort: skip
from ruamel.yaml import YAML
from wcmatch import glob
from . import extract, resources
from ._version import __version__
from .ads import calculate_content_hash, get_description_affixes
from .ads import MAX_DESCRIPTION_LENGTH, calculate_content_hash, get_description_affixes
from .utils import dicts, error_handlers, loggers, misc
from .utils.exceptions import CaptchaEncountered
from .utils.files import abspath
from .utils.i18n import Locale, get_current_locale, pluralize, set_current_locale
from .utils.misc import ainput, ensure, is_frozen, parse_datetime, parse_decimal
from .utils.web_scraping_mixin import By, Element, Is, Page, WebScrapingMixin
from .utils.web_scraping_mixin import By, Element, Is, WebScrapingMixin
# W0406: possibly a bug, see https://github.com/PyCQA/pylint/issues/3933
@@ -287,7 +283,7 @@ class KleinanzeigenBot(WebScrapingMixin):
return True
# Check republication interval
ad_age = datetime.utcnow() - last_updated_on
ad_age = misc.now() - last_updated_on
if ad_age.days <= ad_cfg["republication_interval"]:
LOG.info(
" -> SKIPPED: ad [%s] was last published %d days ago. republication is only required every %s days",
@@ -394,12 +390,12 @@ class KleinanzeigenBot(WebScrapingMixin):
description = self.__get_description_with_affixes(ad_cfg)
# Validate total length
ensure(len(description) <= 4000,
ensure(len(description) <= MAX_DESCRIPTION_LENGTH,
f"Length of ad description including prefix and suffix exceeds 4000 chars. "
f"Description length: {len(description)} chars. @ {ad_file}.")
# pylint: disable=cell-var-from-loop
def assert_one_of(path:str, allowed:Iterable[str]) -> None:
# ruff: noqa: B023 function-uses-loop-variable
ensure(dicts.safe_get(ad_cfg, *path.split(".")) in allowed, f"-> property [{path}] must be one of: {allowed} @ [{ad_file}]")
def assert_min_len(path:str, minlen:int) -> None:
@@ -441,7 +437,7 @@ class KleinanzeigenBot(WebScrapingMixin):
ad_cfg["category"] = resolved_category_id
if ad_cfg["shipping_costs"]:
ad_cfg["shipping_costs"] = str(round(misc.parse_decimal(ad_cfg["shipping_costs"]), 2))
ad_cfg["shipping_costs"] = str(round(parse_decimal(ad_cfg["shipping_costs"]), 2))
if ad_cfg["images"]:
images = []
@@ -564,17 +560,17 @@ class KleinanzeigenBot(WebScrapingMixin):
published_ads = json.loads(
(await self.web_request(f"{self.root_url}/m-meine-anzeigen-verwalten.json?sort=DEFAULT"))["content"])["ads"]
for (ad_file, ad_cfg, _) in ad_cfgs:
for (ad_file, ad_cfg, _ad_cfg_orig) in ad_cfgs:
count += 1
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file)
await self.delete_ad(ad_cfg, self.config["publishing"]["delete_old_ads_by_title"], published_ads)
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config["publishing"]["delete_old_ads_by_title"])
await self.web_sleep()
LOG.info("############################################")
LOG.info("DONE: Deleted %s", pluralize("ad", count))
LOG.info("############################################")
async def delete_ad(self, ad_cfg: dict[str, Any], delete_old_ads_by_title: bool, published_ads: list[dict[str, Any]]) -> bool:
async def delete_ad(self, ad_cfg: dict[str, Any], published_ads: list[dict[str, Any]], *, delete_old_ads_by_title: bool) -> bool:
LOG.info("Deleting ad '%s' if already present...", ad_cfg["title"])
await self.web_open(f"{self.root_url}/m-meine-anzeigen.html")
@@ -625,7 +621,7 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.web_await(lambda: self.web_check(By.ID, "checking-done", Is.DISPLAYED), timeout = 5 * 60)
if self.config["publishing"]["delete_old_ads"] == "AFTER_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, False, published_ads)
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = False)
LOG.info("############################################")
LOG.info("DONE: (Re-)published %s", pluralize("ad", count))
@@ -640,7 +636,7 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.assert_free_ad_limit_not_reached()
if self.config["publishing"]["delete_old_ads"] == "BEFORE_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, self.config["publishing"]["delete_old_ads_by_title"], published_ads)
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config["publishing"]["delete_old_ads_by_title"])
LOG.info("Publishing ad '%s'...", ad_cfg["title"])
@@ -828,7 +824,7 @@ class KleinanzeigenBot(WebScrapingMixin):
# Update content hash after successful publication
# Calculate hash on original config to ensure consistent comparison on restart
ad_cfg_orig["content_hash"] = calculate_content_hash(ad_cfg_orig)
ad_cfg_orig["updated_on"] = datetime.utcnow().isoformat()
ad_cfg_orig["updated_on"] = misc.now().isoformat()
if not ad_cfg["created_on"] and not ad_cfg["id"]:
ad_cfg_orig["created_on"] = ad_cfg_orig["updated_on"]
@@ -914,11 +910,11 @@ class KleinanzeigenBot(WebScrapingMixin):
raise TimeoutError(f"Failed to set special attribute [{special_attribute_key}] (not found)") from ex
try:
elem_id = getattr(special_attr_elem.attrs, 'id')
elem_id = special_attr_elem.attrs.id
if special_attr_elem.local_name == 'select':
LOG.debug("Attribute field '%s' seems to be a select...", special_attribute_key)
await self.web_select(By.ID, elem_id, special_attribute_value)
elif getattr(special_attr_elem.attrs, 'type') == 'checkbox':
elif special_attr_elem.attrs.type == 'checkbox':
LOG.debug("Attribute field '%s' seems to be a checkbox...", special_attribute_key)
await self.web_click(By.ID, elem_id)
else:
@@ -950,7 +946,7 @@ class KleinanzeigenBot(WebScrapingMixin):
else:
try:
# no options. only costs. Set custom shipping cost
if not ad_cfg["shipping_costs"] is None:
if ad_cfg["shipping_costs"] is not None:
await self.web_click(By.XPATH,
'//*[contains(@class, "SubSection")]//*//button[contains(@class, "SelectionButton")]')
await self.web_click(By.XPATH, '//*[contains(@class, "CarrierSelectionModal")]//button[contains(text(),"Andere Versandmethoden")]')
@@ -984,7 +980,7 @@ class KleinanzeigenBot(WebScrapingMixin):
except KeyError as ex:
raise KeyError(f"Unknown shipping option(s), please refer to the documentation/README: {ad_cfg['shipping_options']}") from ex
shipping_sizes, shipping_packages = zip(*mapped_shipping_options)
shipping_sizes, shipping_packages = zip(*mapped_shipping_options, strict=False)
try:
shipping_size, = set(shipping_sizes)
@@ -1159,8 +1155,9 @@ class KleinanzeigenBot(WebScrapingMixin):
final_description = final_description.replace("@", "(at)")
# Validate length
ensure(len(final_description) <= 4000,
f"Length of ad description including prefix and suffix exceeds 4000 chars. Description length: {len(final_description)} chars.")
ensure(len(final_description) <= MAX_DESCRIPTION_LENGTH,
f"Length of ad description including prefix and suffix exceeds {MAX_DESCRIPTION_LENGTH} chars. "
f"Description length: {len(final_description)} chars.")
return final_description

View File

@@ -1,9 +1,7 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import sys, time
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import sys, time # isort: skip
from gettext import gettext as _
import kleinanzeigen_bot
@@ -17,7 +15,6 @@ while True:
try:
kleinanzeigen_bot.main(sys.argv) # runs & returns when finished
sys.exit(0) # not using `break` to prevent process closing issues
except CaptchaEncountered as ex:
delay = ex.restart_delay
print(_("[INFO] Captcha detected. Sleeping %s before restart...") % format_timedelta(delay))

View File

@@ -1,12 +1,13 @@
"""
SPDX-FileCopyrightText: © Jens Bergman and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import hashlib, json, os
from typing import Any
# SPDX-FileCopyrightText: © Jens Bergman and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import hashlib, json, os # isort: skip
from typing import Any, Final
from .utils import dicts
MAX_DESCRIPTION_LENGTH:Final[int] = 4000
def calculate_content_hash(ad_cfg: dict[str, Any]) -> str:
"""Calculate a hash for user-modifiable fields of the ad."""
@@ -39,7 +40,7 @@ def calculate_content_hash(ad_cfg: dict[str, Any]) -> str:
return hashlib.sha256(content_str.encode()).hexdigest()
def get_description_affixes(config: dict[str, Any], prefix: bool = True) -> str:
def get_description_affixes(config: dict[str, Any], *, prefix: bool = True) -> str:
"""Get prefix or suffix for description with proper precedence.
This function handles both the new flattened format and legacy nested format:

View File

@@ -1,9 +1,7 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import json, mimetypes, os, shutil
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import json, mimetypes, os, shutil # isort: skip
import urllib.request as urllib_request
from datetime import datetime
from typing import Any, Final
@@ -24,7 +22,7 @@ class AdExtractor(WebScrapingMixin):
Wrapper class for ad extraction that uses an active bot´s browser session to extract specific elements from an ad page.
"""
def __init__(self, browser:Browser, config:dict[str, Any]):
def __init__(self, browser:Browser, config:dict[str, Any]) -> None:
super().__init__()
self.browser = browser
self.config = config
@@ -84,7 +82,7 @@ class AdExtractor(WebScrapingMixin):
if current_img_url is None:
continue
with urllib_request.urlopen(current_img_url) as response: # nosec B310
with urllib_request.urlopen(current_img_url) as response: # noqa: S310 Audit URL open for permitted schemes.
content_type = response.info().get_content_type()
file_ending = mimetypes.guess_extension(content_type)
img_path = f"{directory}/{img_fn_prefix}{img_nr}{file_ending}"
@@ -170,7 +168,7 @@ class AdExtractor(WebScrapingMixin):
# This will now correctly trigger only if the '.Pagination' div itself is not found
LOG.info('No pagination controls found. Assuming single page.')
except Exception as e:
LOG.error("Error during pagination detection: %s", e, exc_info=True)
LOG.exception("Error during pagination detection: %s", e)
LOG.info('Assuming single page due to error during pagination check.')
# --- End Pagination Handling ---
@@ -201,7 +199,7 @@ class AdExtractor(WebScrapingMixin):
LOG.info("Successfully extracted %s refs from page %s.", len(page_refs), current_page)
except Exception as e:
# Log the error if extraction fails for some items, but try to continue
LOG.error("Error extracting refs on page %s: %s", current_page, e, exc_info=True)
LOG.exception("Error extracting refs on page %s: %s", current_page, e)
if not multi_page: # only one iteration for single-page overview
break
@@ -232,7 +230,7 @@ class AdExtractor(WebScrapingMixin):
LOG.info("No pagination controls found after scrolling/waiting. Assuming last page.")
break
except Exception as e:
LOG.error("Error during pagination navigation: %s", e, exc_info=True)
LOG.exception("Error during pagination navigation: %s", e)
break
# --- End Navigation ---
@@ -287,7 +285,7 @@ class AdExtractor(WebScrapingMixin):
# extract basic info
info['type'] = 'OFFER' if 's-anzeige' in self.page.url else 'WANTED'
title:str = await self.web_text(By.ID, 'viewad-title')
LOG.info('Extracting information from ad with title \"%s\"', title)
LOG.info('Extracting information from ad with title "%s"', title)
info['category'] = await self._extract_category_from_ad_page()
info['title'] = title
@@ -389,7 +387,7 @@ class AdExtractor(WebScrapingMixin):
price = int(price_str.replace('.', '').split()[0])
case 'VB':
price_type = 'NEGOTIABLE'
if not price_str == "VB": # can be either 'X € VB', or just 'VB'
if price_str != "VB": # can be either 'X € VB', or just 'VB'
price = int(price_str.replace('.', '').split()[0])
case 'verschenken':
price_type = 'GIVE_AWAY'

View File

@@ -1,16 +1,15 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import copy, json, os
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import copy, json, os # isort: skip
from collections.abc import Callable
from importlib.resources import read_text as get_resource_as_string
from gettext import gettext as _
from importlib.resources import read_text as get_resource_as_string
from types import ModuleType
from typing import Any, Final
from ruamel.yaml import YAML
from . import files, loggers # pylint: disable=cyclic-import
LOG:Final[loggers.Logger] = loggers.get_logger(__name__)
@@ -112,9 +111,9 @@ def safe_get(a_map:dict[Any, Any], *keys:str) -> Any:
'some_value'
"""
if a_map:
for key in keys:
try:
try:
for key in keys:
a_map = a_map[key]
except (KeyError, TypeError):
return None
except (KeyError, TypeError):
return None
return a_map

View File

@@ -1,11 +1,9 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import sys, traceback
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import sys, traceback # isort: skip
from types import FrameType, TracebackType
from typing import Final
from typing import Any, Final
from . import loggers

View File

@@ -1,8 +1,6 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
from datetime import timedelta
@@ -13,6 +11,6 @@ class KleinanzeigenBotError(RuntimeError):
class CaptchaEncountered(KleinanzeigenBotError):
"""Raised when a Captcha was detected and auto-restart is enabled."""
def __init__(self, restart_delay: timedelta):
def __init__(self, restart_delay: timedelta) -> None:
super().__init__()
self.restart_delay = restart_delay

View File

@@ -1,8 +1,6 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import os

View File

@@ -1,13 +1,12 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import ctypes, gettext, inspect, locale, logging, os, sys
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import ctypes, gettext, inspect, locale, logging, os, sys # isort: skip
from collections.abc import Sized
from typing import Any, Final, NamedTuple
from kleinanzeigen_bot import resources
from . import dicts, reflect
__all__ = [
@@ -96,7 +95,7 @@ def translate(text:object, caller: inspect.FrameInfo | None) -> str:
if not caller:
return text
global _TRANSLATIONS
global _TRANSLATIONS # noqa: PLW0603 Using the global statement to update `...` is discouraged
if _TRANSLATIONS is None:
try:
_TRANSLATIONS = dicts.load_dict_from_module(resources, f"translations.{_CURRENT_LOCALE[0]}.yaml")
@@ -125,10 +124,10 @@ gettext.gettext = lambda message: translate(_original_gettext(message), reflect.
for module_name, module in sys.modules.items():
if module is None or module_name in sys.builtin_module_names:
continue
if hasattr(module, '_') and getattr(module, '_') is _original_gettext:
setattr(module, '_', gettext.gettext)
if hasattr(module, 'gettext') and getattr(module, 'gettext') is _original_gettext:
setattr(module, 'gettext', gettext.gettext)
if hasattr(module, '_') and module._ is _original_gettext:
module._ = gettext.gettext # type: ignore[attr-defined]
if hasattr(module, 'gettext') and module.gettext is _original_gettext:
module.gettext = gettext.gettext # type: ignore[attr-defined]
def get_current_locale() -> Locale:
@@ -136,13 +135,13 @@ def get_current_locale() -> Locale:
def set_current_locale(new_locale:Locale) -> None:
global _CURRENT_LOCALE, _TRANSLATIONS
global _CURRENT_LOCALE, _TRANSLATIONS # noqa: PLW0603 Using the global statement to update `...` is discouraged
if new_locale.language != _CURRENT_LOCALE.language:
_TRANSLATIONS = None
_CURRENT_LOCALE = new_locale
def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str:
def pluralize(noun:str, count:int | Sized, *, prefix_with_count:bool = True) -> str:
"""
>>> set_current_locale(Locale("en")) # Setup for doctests
>>> pluralize("field", 1)
@@ -189,7 +188,7 @@ def pluralize(noun:str, count:int | Sized, prefix_with_count:bool = True) -> str
return f"{prefix}{noun}e" # Hund -> Hunde
# English
if len(noun) < 2:
if len(noun) < 2: # noqa: PLR2004 Magic value used in comparison
return f"{prefix}{noun}s"
if noun.endswith(('s', 'sh', 'ch', 'x', 'z')):
return f"{prefix}{noun}es"

View File

@@ -1,15 +1,14 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import copy, logging, re, sys
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import copy, logging, re, sys # isort: skip
from gettext import gettext as _
from logging import Logger, DEBUG, INFO, WARNING, ERROR, CRITICAL
from logging import CRITICAL, DEBUG, ERROR, INFO, WARNING, Logger
from logging.handlers import RotatingFileHandler
from typing import Any, Final # @UnusedImport
import colorama
from . import i18n, reflect
__all__ = [
@@ -27,6 +26,16 @@ __all__ = [
LOG_ROOT:Final[logging.Logger] = logging.getLogger()
class _MaxLevelFilter(logging.Filter):
def __init__(self, level: int) -> None:
super().__init__()
self.level = level
def filter(self, record: logging.LogRecord) -> bool:
return record.levelno <= self.level
def configure_console_logging() -> None:
# if a StreamHandler already exists, do not append it again
if any(isinstance(h, logging.StreamHandler) for h in LOG_ROOT.handlers):
@@ -82,9 +91,7 @@ def configure_console_logging() -> None:
stdout_log = logging.StreamHandler(sys.stderr)
stdout_log.setLevel(DEBUG)
stdout_log.addFilter(type("", (logging.Filter,), {
"filter": lambda rec: rec.levelno <= INFO
}))
stdout_log.addFilter(_MaxLevelFilter(INFO))
stdout_log.setFormatter(formatter)
LOG_ROOT.addHandler(stdout_log)
@@ -97,7 +104,7 @@ def configure_console_logging() -> None:
class LogFileHandle:
"""Encapsulates a log file handler with close and status methods."""
def __init__(self, file_path: str, handler: RotatingFileHandler, logger: logging.Logger):
def __init__(self, file_path: str, handler: RotatingFileHandler, logger: logging.Logger) -> None:
self.file_path = file_path
self._handler:RotatingFileHandler | None = handler
self._logger = logger

View File

@@ -1,11 +1,9 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import asyncio, decimal, re, sys, time
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio, decimal, re, sys, time # isort: skip
from collections.abc import Callable
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from gettext import gettext as _
from typing import Any, TypeVar
@@ -39,6 +37,10 @@ def ensure(condition:Any | bool | Callable[[], bool], error_message:str, timeout
time.sleep(poll_requency)
def now() -> datetime:
return datetime.now(timezone.utc)
def is_frozen() -> bool:
"""
>>> is_frozen()
@@ -81,12 +83,27 @@ def parse_decimal(number:float | int | str) -> decimal.Decimal:
raise decimal.DecimalException(f"Invalid number format: {number}") from ex
def parse_datetime(date:datetime | str | None) -> datetime | None:
def parse_datetime(
date: datetime | str | None,
*,
add_timezone_if_missing: bool = True,
use_local_timezone: bool = True
) -> datetime | None:
"""
>>> parse_datetime(datetime(2020, 1, 1, 0, 0))
Parses a datetime object or ISO-formatted string.
Args:
date: The input datetime object or ISO string.
add_timezone_if_missing: If True, add timezone info if missing.
use_local_timezone: If True, use local timezone; otherwise UTC if adding timezone.
Returns:
A timezone-aware or naive datetime object, depending on parameters.
>>> parse_datetime(datetime(2020, 1, 1, 0, 0), add_timezone_if_missing = False)
datetime.datetime(2020, 1, 1, 0, 0)
>>> parse_datetime("2020-01-01T00:00:00")
>>> parse_datetime("2020-01-01T00:00:00", add_timezone_if_missing = False)
datetime.datetime(2020, 1, 1, 0, 0)
>>> parse_datetime(None)
@@ -94,9 +111,16 @@ def parse_datetime(date:datetime | str | None) -> datetime | None:
"""
if date is None:
return None
if isinstance(date, datetime):
return date
return datetime.fromisoformat(date)
dt = date if isinstance(date, datetime) else datetime.fromisoformat(date)
if dt.tzinfo is None and add_timezone_if_missing:
dt = (
dt.astimezone() if use_local_timezone
else dt.replace(tzinfo = timezone.utc)
)
return dt
def parse_duration(text:str) -> timedelta:

View File

@@ -1,8 +1,6 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import socket

View File

@@ -1,8 +1,6 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import inspect
from typing import Any

View File

@@ -1,26 +1,24 @@
"""
SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
SPDX-License-Identifier: AGPL-3.0-or-later
SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
"""
import asyncio, enum, inspect, json, os, platform, secrets, shutil, time
# SPDX-FileCopyrightText: © Sebastian Thomschke and contributors
# SPDX-License-Identifier: AGPL-3.0-or-later
# SPDX-ArtifactOfProjectHomePage: https://github.com/Second-Hand-Friends/kleinanzeigen-bot/
import asyncio, enum, inspect, json, os, platform, secrets, shutil # isort: skip
from collections.abc import Callable, Coroutine, Iterable
from gettext import gettext as _
from typing import cast, Any, Final
from typing import Any, Final, cast
try:
from typing import Never # type: ignore[attr-defined,unused-ignore] # mypy
except ImportError:
from typing import NoReturn as Never # Python <3.11
import nodriver, psutil
import nodriver, psutil # isort: skip
from nodriver.core.browser import Browser
from nodriver.core.config import Config
from nodriver.core.element import Element
from nodriver.core.tab import Tab as Page
from . import loggers, net
from .misc import ensure, T
from .misc import T, ensure
__all__ = [
"Browser",
@@ -70,8 +68,8 @@ class WebScrapingMixin:
def __init__(self) -> None:
self.browser_config:Final[BrowserConfig] = BrowserConfig()
self.browser:Browser = None # pyright: ignore
self.page:Page = None # pyright: ignore
self.browser:Browser = None # pyright: ignore[reportAttributeAccessIssue]
self.page:Page = None # pyright: ignore[reportAttributeAccessIssue]
async def create_browser_session(self) -> None:
LOG.info("Creating Browser session...")
@@ -96,7 +94,7 @@ class WebScrapingMixin:
if remote_port > 0:
LOG.info("Using existing browser process at %s:%s", remote_host, remote_port)
ensure(net.is_port_open(remote_host, remote_port),
f"Browser process not reachable at {remote_host}:{remote_port}. " +
f"Browser process not reachable at {remote_host}:{remote_port}. "
f"Start the browser with --remote-debugging-port={remote_port} or remove this port from your config.yaml")
cfg = Config(
browser_executable_path = self.browser_config.binary_location # actually not necessary but nodriver fails without
@@ -208,14 +206,14 @@ class WebScrapingMixin:
def close_browser_session(self) -> None:
if self.browser:
LOG.debug("Closing Browser session...")
self.page = None # pyright: ignore
browser_process = psutil.Process(self.browser._process_pid) # pylint: disable=protected-access
self.page = None # pyright: ignore[reportAttributeAccessIssue]
browser_process = psutil.Process(self.browser._process_pid) # noqa: SLF001 Private member accessed
browser_children:list[psutil.Process] = browser_process.children()
self.browser.stop()
for p in browser_children:
if p.is_running():
p.kill() # terminate orphaned browser processes
self.browser = None # pyright: ignore
self.browser = None # pyright: ignore[reportAttributeAccessIssue]
def get_compatible_browser(self) -> str:
match platform.system():
@@ -236,15 +234,15 @@ class WebScrapingMixin:
case "Windows":
browser_paths = [
os.environ.get("ProgramFiles", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("ProgramFiles(x86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("PROGRAMFILES", "C:\\Program Files") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ.get("PROGRAMFILES(X86)", "C:\\Program Files (x86)") + r'\Microsoft\Edge\Application\msedge.exe',
os.environ["ProgramFiles"] + r'\Chromium\Application\chrome.exe',
os.environ["ProgramFiles(x86)"] + r'\Chromium\Application\chrome.exe',
os.environ["PROGRAMFILES"] + r'\Chromium\Application\chrome.exe',
os.environ["PROGRAMFILES(X86)"] + r'\Chromium\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chromium\Application\chrome.exe',
os.environ["ProgramFiles"] + r'\Chrome\Application\chrome.exe',
os.environ["ProgramFiles(x86)"] + r'\Chrome\Application\chrome.exe',
os.environ["PROGRAMFILES"] + r'\Chrome\Application\chrome.exe',
os.environ["PROGRAMFILES(X86)"] + r'\Chrome\Application\chrome.exe',
os.environ["LOCALAPPDATA"] + r'\Chrome\Application\chrome.exe',
shutil.which("msedge.exe"),
@@ -277,7 +275,7 @@ class WebScrapingMixin:
ex:Exception | None = None
try:
result_raw = condition()
result:T = (await result_raw) if inspect.isawaitable(result_raw) else result_raw
result:T = cast(T, await result_raw if inspect.isawaitable(result_raw) else result_raw)
if result:
return result
except Exception as ex1:
@@ -359,11 +357,11 @@ class WebScrapingMixin:
_prev_jscode:str = getattr(self.__class__.web_execute, "_prev_jscode", "")
if not (jscode == _prev_jscode or (jscode.startswith("window.scrollTo") and _prev_jscode.startswith("window.scrollTo"))):
LOG.debug("web_execute(`%s`) = `%s`", jscode, result)
self.__class__.web_execute._prev_jscode = jscode # type: ignore[attr-defined] # pylint: disable=protected-access
self.__class__.web_execute._prev_jscode = jscode # type: ignore[attr-defined] # noqa: SLF001 Private member accessed
return result
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> Element:
async def web_find(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float = 5) -> Element:
"""
Locates an HTML element by the given selector type and value.
@@ -408,7 +406,7 @@ class WebScrapingMixin:
raise AssertionError(_("Unsupported selector type: %s") % selector_type)
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> list[Element]:
async def web_find_all(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float = 5) -> list[Element]:
"""
Locates an HTML element by ID.
@@ -460,7 +458,7 @@ class WebScrapingMixin:
await self.web_sleep()
return input_field
async def web_open(self, url:str, *, timeout:int | float = 15000, reload_if_already_open:bool = False) -> None:
async def web_open(self, url:str, *, timeout:int | float = 15_000, reload_if_already_open:bool = False) -> None:
"""
:param url: url to open in browser
:param timeout: timespan in seconds within the page needs to be loaded
@@ -475,7 +473,7 @@ class WebScrapingMixin:
await self.web_await(lambda: self.web_execute("document.readyState == 'complete'"), timeout = timeout,
timeout_error_message = f"Page did not finish loading within {timeout} seconds.")
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element = None, timeout:int | float = 5) -> str:
async def web_text(self, selector_type:By, selector_value:str, *, parent:Element | None = None, timeout:int | float = 5) -> str:
return str(await (await self.web_find(selector_type, selector_value, parent = parent, timeout = timeout)).apply("""
function (elem) {
let sel = window.getSelection()
@@ -489,10 +487,11 @@ class WebScrapingMixin:
}
"""))
async def web_sleep(self, min_ms:int = 1000, max_ms:int = 2500) -> None:
async def web_sleep(self, min_ms:int = 1_000, max_ms:int = 2_500) -> None:
duration = max_ms <= min_ms and min_ms or secrets.randbelow(max_ms - min_ms) + min_ms
LOG.log(loggers.INFO if duration > 1500 else loggers.DEBUG, " ... pausing for %d ms ...", duration)
await self.page.sleep(duration / 1000)
LOG.log(loggers.INFO if duration > 1_500 else loggers.DEBUG, # noqa: PLR2004 Magic value used in comparison
" ... pausing for %d ms ...", duration)
await self.page.sleep(duration / 1_000)
async def web_request(self, url:str, method:str = "GET", valid_response_codes:int | Iterable[int] = 200,
headers:dict[str, str] | None = None) -> dict[str, Any]:
@@ -524,7 +523,7 @@ class WebScrapingMixin:
return response
# pylint: enable=dangerous-default-value
async def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10000, scroll_back_top: bool = False) -> None:
async def web_scroll_page_down(self, scroll_length: int = 10, scroll_speed: int = 10_000, *, scroll_back_top: bool = False) -> None:
"""
Smoothly scrolls the current web page down.
@@ -537,13 +536,13 @@ class WebScrapingMixin:
while current_y_pos < bottom_y_pos: # scroll in steps until bottom reached
current_y_pos += scroll_length
await self.web_execute(f'window.scrollTo(0, {current_y_pos})') # scroll one step
time.sleep(scroll_length / scroll_speed)
await asyncio.sleep(scroll_length / scroll_speed)
if scroll_back_top: # scroll back to top in same style
while current_y_pos > 0:
current_y_pos -= scroll_length
await self.web_execute(f'window.scrollTo(0, {current_y_pos})')
time.sleep(scroll_length / scroll_speed / 2) # double speed
await asyncio.sleep(scroll_length / scroll_speed / 2) # double speed
async def web_select(self, selector_type:By, selector_value:str, selected_value:Any, timeout:int | float = 5) -> Element:
"""