feat: add type safe Config model

This commit is contained in:
sebthom
2025-05-14 00:30:59 +02:00
committed by Sebastian Thomschke
parent e7a3d46d25
commit 1369da1c34
21 changed files with 1132 additions and 389 deletions

View File

@@ -15,6 +15,7 @@ from wcmatch import glob
from . import extract, resources
from ._version import __version__
from .ads import MAX_DESCRIPTION_LENGTH, calculate_content_hash, get_description_affixes
from .model.config_model import Config
from .utils import dicts, error_handlers, loggers, misc
from .utils.exceptions import CaptchaEncountered
from .utils.files import abspath
@@ -42,7 +43,7 @@ class KleinanzeigenBot(WebScrapingMixin):
self.root_url = "https://www.kleinanzeigen.de"
self.config:dict[str, Any] = {}
self.config:Config
self.config_file_path = abspath("config.yaml")
self.categories:dict[str, str] = {}
@@ -325,7 +326,7 @@ class KleinanzeigenBot(WebScrapingMixin):
ad_files:dict[str, str] = {}
data_root_dir = os.path.dirname(self.config_file_path)
for file_pattern in self.config["ad_files"]:
for file_pattern in self.config.ad_files:
for ad_file in glob.glob(file_pattern, root_dir = data_root_dir, flags = glob.GLOBSTAR | glob.BRACE | glob.EXTGLOB):
if not str(ad_file).endswith("ad_fields.yaml"):
ad_files[abspath(ad_file, relative_to = data_root_dir)] = ad_file
@@ -349,7 +350,7 @@ class KleinanzeigenBot(WebScrapingMixin):
ad_cfg_orig = dicts.load_dict(ad_file, "ad")
ad_cfg = copy.deepcopy(ad_cfg_orig)
dicts.apply_defaults(ad_cfg,
self.config["ad_defaults"],
self.config.ad_defaults.model_dump(),
ignore = lambda k, _: k == "description",
override = lambda _, v: v == "" # noqa: PLC1901 can be simplified to `not v` as an empty string is falsey
)
@@ -462,40 +463,44 @@ class KleinanzeigenBot(WebScrapingMixin):
return ads
def load_config(self) -> None:
config_defaults = dicts.load_dict_from_module(resources, "config_defaults.yaml")
config = dicts.load_dict_if_exists(self.config_file_path, _("config"))
if config is None:
# write default config.yaml if config file does not exist
if not os.path.exists(self.config_file_path):
LOG.warning("Config file %s does not exist. Creating it with default values...", self.config_file_path)
dicts.save_dict(self.config_file_path, config_defaults)
config = {}
default_config = Config.model_construct()
default_config.login.username = ""
default_config.login.password = ""
dicts.save_dict(self.config_file_path, default_config.model_dump(exclude_none = True, exclude = {
"ad_defaults": {
"description" # deprecated
}
}), header = "# yaml-language-server: $schema=https://raw.githubusercontent.com/Second-Hand-Friends/kleinanzeigen-bot/refs/heads/main/schemas/config.schema.json")
self.config = dicts.apply_defaults(config, config_defaults)
config_yaml = dicts.load_dict_if_exists(self.config_file_path, _("config"))
self.config = Config.model_validate(config_yaml, strict = True, context = self.config_file_path)
# load built-in category mappings
self.categories = dicts.load_dict_from_module(resources, "categories.yaml", "categories")
deprecated_categories = dicts.load_dict_from_module(resources, "categories_old.yaml", "categories")
self.categories.update(deprecated_categories)
if self.config["categories"]:
self.categories.update(self.config["categories"])
if self.config.categories:
self.categories.update(self.config.categories)
LOG.info(" -> found %s", pluralize("category", self.categories))
ensure(self.config["login"]["username"], f"[login.username] not specified @ [{self.config_file_path}]")
ensure(self.config["login"]["password"], f"[login.password] not specified @ [{self.config_file_path}]")
self.browser_config.arguments = self.config["browser"]["arguments"]
self.browser_config.binary_location = self.config["browser"]["binary_location"]
self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config["browser"]["extensions"]]
self.browser_config.use_private_window = self.config["browser"]["use_private_window"]
if self.config["browser"]["user_data_dir"]:
self.browser_config.user_data_dir = abspath(self.config["browser"]["user_data_dir"], relative_to = self.config_file_path)
self.browser_config.profile_name = self.config["browser"]["profile_name"]
# populate browser_config object used by WebScrapingMixin
self.browser_config.arguments = self.config.browser.arguments
self.browser_config.binary_location = self.config.browser.binary_location
self.browser_config.extensions = [abspath(item, relative_to = self.config_file_path) for item in self.config.browser.extensions]
self.browser_config.use_private_window = self.config.browser.use_private_window
if self.config.browser.user_data_dir:
self.browser_config.user_data_dir = abspath(self.config.browser.user_data_dir, relative_to = self.config_file_path)
self.browser_config.profile_name = self.config.browser.profile_name
async def login(self) -> None:
LOG.info("Checking if already logged in...")
await self.web_open(f"{self.root_url}")
if await self.is_logged_in():
LOG.info("Already logged in as [%s]. Skipping login.", self.config["login"]["username"])
LOG.info("Already logged in as [%s]. Skipping login.", self.config.login.username)
return
LOG.info("Opening login page...")
@@ -519,9 +524,9 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.handle_after_login_logic()
async def fill_login_data_and_send(self) -> None:
LOG.info("Logging in as [%s]...", self.config["login"]["username"])
await self.web_input(By.ID, "email", self.config["login"]["username"])
await self.web_input(By.ID, "password", self.config["login"]["password"])
LOG.info("Logging in as [%s]...", self.config.login.username)
await self.web_input(By.ID, "email", self.config.login.username)
await self.web_input(By.ID, "password", self.config.login.password)
await self.web_click(By.CSS_SELECTOR, "form#login-form button[type='submit']")
async def handle_after_login_logic(self) -> None:
@@ -546,13 +551,13 @@ class KleinanzeigenBot(WebScrapingMixin):
try:
# Try to find the standard element first
user_info = await self.web_text(By.CLASS_NAME, "mr-medium")
if self.config["login"]["username"].lower() in user_info.lower():
if self.config.login.username.lower() in user_info.lower():
return True
except TimeoutError:
try:
# If standard element not found, try the alternative
user_info = await self.web_text(By.ID, "user-email")
if self.config["login"]["username"].lower() in user_info.lower():
if self.config.login.username.lower() in user_info.lower():
return True
except TimeoutError:
return False
@@ -567,7 +572,7 @@ class KleinanzeigenBot(WebScrapingMixin):
for (ad_file, ad_cfg, _ad_cfg_orig) in ad_cfgs:
count += 1
LOG.info("Processing %s/%s: '%s' from [%s]...", count, len(ad_cfgs), ad_cfg["title"], ad_file)
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config["publishing"]["delete_old_ads_by_title"])
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
await self.web_sleep()
LOG.info("############################################")
@@ -624,7 +629,7 @@ class KleinanzeigenBot(WebScrapingMixin):
await self.publish_ad(ad_file, ad_cfg, ad_cfg_orig, published_ads)
await self.web_await(lambda: self.web_check(By.ID, "checking-done", Is.DISPLAYED), timeout = 5 * 60)
if self.config["publishing"]["delete_old_ads"] == "AFTER_PUBLISH" and not self.keep_old_ads:
if self.config.publishing.delete_old_ads == "AFTER_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = False)
LOG.info("############################################")
@@ -639,8 +644,8 @@ class KleinanzeigenBot(WebScrapingMixin):
"""
await self.assert_free_ad_limit_not_reached()
if self.config["publishing"]["delete_old_ads"] == "BEFORE_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config["publishing"]["delete_old_ads_by_title"])
if self.config.publishing.delete_old_ads == "BEFORE_PUBLISH" and not self.keep_old_ads:
await self.delete_ad(ad_cfg, published_ads, delete_old_ads_by_title = self.config.publishing.delete_old_ads_by_title)
LOG.info("Publishing ad '%s'...", ad_cfg["title"])
@@ -779,9 +784,9 @@ class KleinanzeigenBot(WebScrapingMixin):
"iframe[name^='a-'][src^='https://www.google.com/recaptcha/api2/anchor?']",
timeout = 2)
if self.config.get("captcha", {}).get("auto_restart", False):
if self.config.captcha.auto_restart:
LOG.warning("Captcha recognized - auto-restart enabled, abort run...")
raise CaptchaEncountered(misc.parse_duration(self.config.get("captcha", {}).get("restart_delay", "6h")))
raise CaptchaEncountered(misc.parse_duration(self.config.captcha.restart_delay))
# Fallback: manuell
LOG.warning("############################################")
@@ -1036,7 +1041,7 @@ class KleinanzeigenBot(WebScrapingMixin):
async def assert_free_ad_limit_not_reached(self) -> None:
try:
await self.web_find(By.XPATH, "/html/body/div[1]/form/fieldset[6]/div[1]/header", timeout = 2)
raise AssertionError(f"Cannot publish more ads. The monthly limit of free ads of account {self.config['login']['username']} is reached.")
raise AssertionError(f"Cannot publish more ads. The monthly limit of free ads of account {self.config.login.username} is reached.")
except TimeoutError:
pass